This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ce8eeda196 Read API Source v2 (#25392)
4ce8eeda196 is described below

commit 4ce8eeda19699cc64ae8cf310267a478cfe9e4b8
Author: Vachan <[email protected]>
AuthorDate: Wed Feb 22 19:57:09 2023 +0000

    Read API Source v2 (#25392)
    
    * ReadAPI Source v2
    
    * Renamed the Source V2. Also added tests for the same.
    
    * v2 using OffsetBasedSource and OffsetBasedReader
    
    * Updating tests to have more sensible mock values.
    
    * Updated BqOption flag.
    
    * Simplifying `fractionConsumed` calculation.
    
    * Better variable names.
    
    * Minor refactoring.
    
    * Added a synchronized block in readNextRecord(). Also added comments to 
explain how OffsetBasedSource + RangeTracker is used.
    
    * Removed unnecessary synchronized block, added Javadoc and improved unit 
test coverage.
    
    * Consolidated code paths in `BigQueryIO` for Bundled and regular ReadAPI 
sources.
    
    * Lint fixes.
    
    * Minor Javadoc fix.
    
    * Fix StreamBundle creation logic and some minor code comment updates.
    
    * Updated logging.
    
    * Lint fixes.
    
    * ReadAPI Source v2
    
    * Renamed the Source V2. Also added tests for the same.
    
    * v2 using OffsetBasedSource and OffsetBasedReader
    
    * Updating tests to have more sensible mock values.
    
    * Updated BqOption flag.
    
    * Simplifying `fractionConsumed` calculation.
    
    * Better variable names.
    
    * Minor refactoring.
    
    * Added a synchronized block in readNextRecord(). Also added comments to 
explain how OffsetBasedSource + RangeTracker is used.
    
    * Removed unnecessary synchronized block, added Javadoc and improved unit 
test coverage.
    
    * Consolidated code paths in `BigQueryIO` for Bundled and regular ReadAPI 
sources.
    
    * Lint fixes.
    
    * Minor Javadoc fix.
    
    * Fix StreamBundle creation logic and some minor code comment updates.
    
    * Updated logging.
    
    * Lint fixes.
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       |  359 +++-
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java  |   11 +
 .../io/gcp/bigquery/BigQueryStorageSourceBase.java |   65 +-
 .../BigQueryStorageStreamBundleSource.java         |  381 ++++
 ...eryIOStorageReadWithStreamBundleSourceTest.java | 2156 ++++++++++++++++++++
 5 files changed, 2867 insertions(+), 105 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 04a81821d8e..6745f7aceea 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -68,6 +68,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.extensions.avro.io.AvroSource;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
@@ -75,7 +76,9 @@ import org.apache.beam.sdk.extensions.gcp.util.Transport;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.fs.MoveOptions;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -1179,11 +1182,11 @@ public class BigQueryIO {
 
       // if both toRowFn and fromRowFn values are set, enable Beam schema 
support
       Pipeline p = input.getPipeline();
+      BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
       final BigQuerySourceDef sourceDef = createSourceDef();
 
       Schema beamSchema = null;
       if (getTypeDescriptor() != null && getToBeamRowFn() != null && 
getFromBeamRowFn() != null) {
-        BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
         beamSchema = sourceDef.getBeamSchema(bqOptions);
         beamSchema = getFinalSchema(beamSchema, getSelectedFields());
       }
@@ -1191,7 +1194,7 @@ public class BigQueryIO {
       final Coder<T> coder = inferCoder(p.getCoderRegistry());
 
       if (getMethod() == TypedRead.Method.DIRECT_READ) {
-        return expandForDirectRead(input, coder, beamSchema);
+        return expandForDirectRead(input, coder, beamSchema, bqOptions);
       }
 
       checkArgument(
@@ -1369,7 +1372,7 @@ public class BigQueryIO {
     }
 
     private PCollection<T> expandForDirectRead(
-        PBegin input, Coder<T> outputCoder, Schema beamSchema) {
+        PBegin input, Coder<T> outputCoder, Schema beamSchema, BigQueryOptions 
bqOptions) {
       ValueProvider<TableReference> tableProvider = getTableProvider();
       Pipeline p = input.getPipeline();
       if (tableProvider != null) {
@@ -1416,6 +1419,7 @@ public class BigQueryIO {
       //
 
       PCollectionView<String> jobIdTokenView;
+      PCollectionTuple tuple;
       PCollection<T> rows;
 
       if (!getWithTemplateCompatibility()) {
@@ -1446,108 +1450,46 @@ public class BigQueryIO {
         jobIdTokenView = jobIdTokenCollection.apply("ViewId", 
View.asSingleton());
 
         TupleTag<ReadStream> readStreamsTag = new TupleTag<>();
+        TupleTag<List<ReadStream>> listReadStreamsTag = new TupleTag<>();
         TupleTag<ReadSession> readSessionTag = new TupleTag<>();
         TupleTag<String> tableSchemaTag = new TupleTag<>();
 
-        PCollectionTuple tuple =
-            jobIdTokenCollection.apply(
-                "RunQueryJob",
-                ParDo.of(
-                        new DoFn<String, ReadStream>() {
-                          @ProcessElement
-                          public void processElement(ProcessContext c) throws 
Exception {
-                            BigQueryOptions options =
-                                
c.getPipelineOptions().as(BigQueryOptions.class);
-                            String jobUuid = c.element();
-                            // Execute the query and get the destination table 
holding the results.
-                            // The getTargetTable call runs a new instance of 
the query and returns
-                            // the destination table created to hold the 
results.
-                            BigQueryStorageQuerySource<T> querySource =
-                                createStorageQuerySource(jobUuid, outputCoder);
-                            Table queryResultTable = 
querySource.getTargetTable(options);
-
-                            // Create a read session without specifying a 
desired stream count and
-                            // let the BigQuery storage server pick the number 
of streams.
-                            CreateReadSessionRequest request =
-                                CreateReadSessionRequest.newBuilder()
-                                    .setParent(
-                                        BigQueryHelpers.toProjectResourceName(
-                                            options.getBigQueryProject() == 
null
-                                                ? options.getProject()
-                                                : 
options.getBigQueryProject()))
-                                    .setReadSession(
-                                        ReadSession.newBuilder()
-                                            .setTable(
-                                                
BigQueryHelpers.toTableResourceName(
-                                                    
queryResultTable.getTableReference()))
-                                            .setDataFormat(DataFormat.AVRO))
-                                    .setMaxStreamCount(0)
-                                    .build();
-
-                            ReadSession readSession;
-                            try (StorageClient storageClient =
-                                
getBigQueryServices().getStorageClient(options)) {
-                              readSession = 
storageClient.createReadSession(request);
-                            }
-
-                            for (ReadStream readStream : 
readSession.getStreamsList()) {
-                              c.output(readStream);
-                            }
-
-                            c.output(readSessionTag, readSession);
-                            c.output(
-                                tableSchemaTag,
-                                
BigQueryHelpers.toJsonString(queryResultTable.getSchema()));
-                          }
-                        })
-                    .withOutputTags(
-                        readStreamsTag, 
TupleTagList.of(readSessionTag).and(tableSchemaTag)));
+        if (!bqOptions.getEnableBundling()) {
+          tuple =
+              createTupleForDirectRead(
+                  jobIdTokenCollection,
+                  outputCoder,
+                  readStreamsTag,
+                  readSessionTag,
+                  tableSchemaTag);
+          tuple.get(readStreamsTag).setCoder(ProtoCoder.of(ReadStream.class));
+        } else {
+          tuple =
+              createTupleForDirectReadWithStreamBundle(
+                  jobIdTokenCollection,
+                  outputCoder,
+                  listReadStreamsTag,
+                  readSessionTag,
+                  tableSchemaTag);
+          
tuple.get(listReadStreamsTag).setCoder(ListCoder.of(ProtoCoder.of(ReadStream.class)));
+        }
 
-        tuple.get(readStreamsTag).setCoder(ProtoCoder.of(ReadStream.class));
         tuple.get(readSessionTag).setCoder(ProtoCoder.of(ReadSession.class));
         tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of());
-
         PCollectionView<ReadSession> readSessionView =
             tuple.get(readSessionTag).apply("ReadSessionView", 
View.asSingleton());
         PCollectionView<String> tableSchemaView =
             tuple.get(tableSchemaTag).apply("TableSchemaView", 
View.asSingleton());
 
-        rows =
-            tuple
-                .get(readStreamsTag)
-                .apply(Reshuffle.viaRandomKey())
-                .apply(
-                    ParDo.of(
-                            new DoFn<ReadStream, T>() {
-                              @ProcessElement
-                              public void processElement(ProcessContext c) 
throws Exception {
-                                ReadSession readSession = 
c.sideInput(readSessionView);
-                                TableSchema tableSchema =
-                                    BigQueryHelpers.fromJsonString(
-                                        c.sideInput(tableSchemaView), 
TableSchema.class);
-                                ReadStream readStream = c.element();
-
-                                BigQueryStorageStreamSource<T> streamSource =
-                                    BigQueryStorageStreamSource.create(
-                                        readSession,
-                                        readStream,
-                                        tableSchema,
-                                        getParseFn(),
-                                        outputCoder,
-                                        getBigQueryServices());
-
-                                // Read all of the data from the stream. In 
the event that this work
-                                // item fails and is rescheduled, the same 
rows will be returned in
-                                // the same order.
-                                BoundedSource.BoundedReader<T> reader =
-                                    
streamSource.createReader(c.getPipelineOptions());
-                                for (boolean more = reader.start(); more; more 
= reader.advance()) {
-                                  c.output(reader.getCurrent());
-                                }
-                              }
-                            })
-                        .withSideInputs(readSessionView, tableSchemaView))
-                .setCoder(outputCoder);
+        if (!bqOptions.getEnableBundling()) {
+          rows =
+              createPCollectionForDirectRead(
+                  tuple, outputCoder, readStreamsTag, readSessionView, 
tableSchemaView);
+        } else {
+          rows =
+              createPCollectionForDirectReadWithStreamBundle(
+                  tuple, outputCoder, listReadStreamsTag, readSessionView, 
tableSchemaView);
+        }
       }
 
       PassThroughThenCleanup.CleanupOperation cleanupOperation =
@@ -1593,6 +1535,235 @@ public class BigQueryIO {
       return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, 
jobIdTokenView));
     }
 
+    private PCollectionTuple createTupleForDirectRead(
+        PCollection<String> jobIdTokenCollection,
+        Coder<T> outputCoder,
+        TupleTag<ReadStream> readStreamsTag,
+        TupleTag<ReadSession> readSessionTag,
+        TupleTag<String> tableSchemaTag) {
+      PCollectionTuple tuple =
+          jobIdTokenCollection.apply(
+              "RunQueryJob",
+              ParDo.of(
+                      new DoFn<String, ReadStream>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) throws 
Exception {
+                          BigQueryOptions options =
+                              c.getPipelineOptions().as(BigQueryOptions.class);
+                          String jobUuid = c.element();
+                          // Execute the query and get the destination table 
holding the results.
+                          // The getTargetTable call runs a new instance of 
the query and returns
+                          // the destination table created to hold the results.
+                          BigQueryStorageQuerySource<T> querySource =
+                              createStorageQuerySource(jobUuid, outputCoder);
+                          Table queryResultTable = 
querySource.getTargetTable(options);
+
+                          // Create a read session without specifying a 
desired stream count and
+                          // let the BigQuery storage server pick the number 
of streams.
+                          CreateReadSessionRequest request =
+                              CreateReadSessionRequest.newBuilder()
+                                  .setParent(
+                                      BigQueryHelpers.toProjectResourceName(
+                                          options.getBigQueryProject() == null
+                                              ? options.getProject()
+                                              : options.getBigQueryProject()))
+                                  .setReadSession(
+                                      ReadSession.newBuilder()
+                                          .setTable(
+                                              
BigQueryHelpers.toTableResourceName(
+                                                  
queryResultTable.getTableReference()))
+                                          .setDataFormat(DataFormat.AVRO))
+                                  .setMaxStreamCount(0)
+                                  .build();
+
+                          ReadSession readSession;
+                          try (StorageClient storageClient =
+                              getBigQueryServices().getStorageClient(options)) 
{
+                            readSession = 
storageClient.createReadSession(request);
+                          }
+
+                          for (ReadStream readStream : 
readSession.getStreamsList()) {
+                            c.output(readStream);
+                          }
+
+                          c.output(readSessionTag, readSession);
+                          c.output(
+                              tableSchemaTag,
+                              
BigQueryHelpers.toJsonString(queryResultTable.getSchema()));
+                        }
+                      })
+                  .withOutputTags(
+                      readStreamsTag, 
TupleTagList.of(readSessionTag).and(tableSchemaTag)));
+
+      return tuple;
+    }
+
+    private PCollectionTuple createTupleForDirectReadWithStreamBundle(
+        PCollection<String> jobIdTokenCollection,
+        Coder<T> outputCoder,
+        TupleTag<List<ReadStream>> listReadStreamsTag,
+        TupleTag<ReadSession> readSessionTag,
+        TupleTag<String> tableSchemaTag) {
+
+      PCollectionTuple tuple =
+          jobIdTokenCollection.apply(
+              "RunQueryJob",
+              ParDo.of(
+                      new DoFn<String, List<ReadStream>>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext c) throws 
Exception {
+                          BigQueryOptions options =
+                              c.getPipelineOptions().as(BigQueryOptions.class);
+                          String jobUuid = c.element();
+                          // Execute the query and get the destination table 
holding the results.
+                          // The getTargetTable call runs a new instance of 
the query and returns
+                          // the destination table created to hold the results.
+                          BigQueryStorageQuerySource<T> querySource =
+                              createStorageQuerySource(jobUuid, outputCoder);
+                          Table queryResultTable = 
querySource.getTargetTable(options);
+
+                          // Create a read session without specifying a 
desired stream count and
+                          // let the BigQuery storage server pick the number 
of streams.
+                          CreateReadSessionRequest request =
+                              CreateReadSessionRequest.newBuilder()
+                                  .setParent(
+                                      BigQueryHelpers.toProjectResourceName(
+                                          options.getBigQueryProject() == null
+                                              ? options.getProject()
+                                              : options.getBigQueryProject()))
+                                  .setReadSession(
+                                      ReadSession.newBuilder()
+                                          .setTable(
+                                              
BigQueryHelpers.toTableResourceName(
+                                                  
queryResultTable.getTableReference()))
+                                          .setDataFormat(DataFormat.AVRO))
+                                  .setMaxStreamCount(0)
+                                  .build();
+
+                          ReadSession readSession;
+                          try (StorageClient storageClient =
+                              getBigQueryServices().getStorageClient(options)) 
{
+                            readSession = 
storageClient.createReadSession(request);
+                          }
+                          int streamIndex = 0;
+                          int streamsPerBundle = 10;
+                          List<ReadStream> streamBundle = Lists.newArrayList();
+                          for (ReadStream readStream : 
readSession.getStreamsList()) {
+                            streamIndex++;
+                            streamBundle.add(readStream);
+                            if (streamIndex % streamsPerBundle == 0) {
+                              c.output(streamBundle);
+                              streamBundle = Lists.newArrayList();
+                            }
+                          }
+                          if (streamIndex % streamsPerBundle != 0) {
+                            c.output(streamBundle);
+                          }
+                          c.output(readSessionTag, readSession);
+                          c.output(
+                              tableSchemaTag,
+                              
BigQueryHelpers.toJsonString(queryResultTable.getSchema()));
+                        }
+                      })
+                  .withOutputTags(
+                      listReadStreamsTag, 
TupleTagList.of(readSessionTag).and(tableSchemaTag)));
+
+      return tuple;
+    }
+
+    private PCollection<T> createPCollectionForDirectRead(
+        PCollectionTuple tuple,
+        Coder<T> outputCoder,
+        TupleTag<ReadStream> readStreamsTag,
+        PCollectionView<ReadSession> readSessionView,
+        PCollectionView<String> tableSchemaView) {
+      PCollection<T> rows =
+          tuple
+              .get(readStreamsTag)
+              .apply(Reshuffle.viaRandomKey())
+              .apply(
+                  ParDo.of(
+                          new DoFn<ReadStream, T>() {
+                            @ProcessElement
+                            public void processElement(ProcessContext c) 
throws Exception {
+                              ReadSession readSession = 
c.sideInput(readSessionView);
+                              TableSchema tableSchema =
+                                  BigQueryHelpers.fromJsonString(
+                                      c.sideInput(tableSchemaView), 
TableSchema.class);
+                              ReadStream readStream = c.element();
+
+                              BigQueryStorageStreamSource<T> streamSource =
+                                  BigQueryStorageStreamSource.create(
+                                      readSession,
+                                      readStream,
+                                      tableSchema,
+                                      getParseFn(),
+                                      outputCoder,
+                                      getBigQueryServices());
+
+                              // Read all of the data from the stream. In the 
event that this work
+                              // item fails and is rescheduled, the same rows 
will be returned in
+                              // the same order.
+                              BoundedSource.BoundedReader<T> reader =
+                                  
streamSource.createReader(c.getPipelineOptions());
+                              for (boolean more = reader.start(); more; more = 
reader.advance()) {
+                                c.output(reader.getCurrent());
+                              }
+                            }
+                          })
+                      .withSideInputs(readSessionView, tableSchemaView))
+              .setCoder(outputCoder);
+
+      return rows;
+    }
+
+    private PCollection<T> createPCollectionForDirectReadWithStreamBundle(
+        PCollectionTuple tuple,
+        Coder<T> outputCoder,
+        TupleTag<List<ReadStream>> listReadStreamsTag,
+        PCollectionView<ReadSession> readSessionView,
+        PCollectionView<String> tableSchemaView) {
+      PCollection<T> rows =
+          tuple
+              .get(listReadStreamsTag)
+              .apply(Reshuffle.viaRandomKey())
+              .apply(
+                  ParDo.of(
+                          new DoFn<List<ReadStream>, T>() {
+                            @ProcessElement
+                            public void processElement(ProcessContext c) 
throws Exception {
+                              ReadSession readSession = 
c.sideInput(readSessionView);
+                              TableSchema tableSchema =
+                                  BigQueryHelpers.fromJsonString(
+                                      c.sideInput(tableSchemaView), 
TableSchema.class);
+                              List<ReadStream> streamBundle = c.element();
+
+                              BigQueryStorageStreamBundleSource<T> 
streamSource =
+                                  BigQueryStorageStreamBundleSource.create(
+                                      readSession,
+                                      streamBundle,
+                                      tableSchema,
+                                      getParseFn(),
+                                      outputCoder,
+                                      getBigQueryServices(),
+                                      1L);
+
+                              // Read all of the data from the stream. In the 
event that this work
+                              // item fails and is rescheduled, the same rows 
will be returned in
+                              // the same order.
+                              BoundedReader<T> reader =
+                                  
streamSource.createReader(c.getPipelineOptions());
+                              for (boolean more = reader.start(); more; more = 
reader.advance()) {
+                                c.output(reader.getCurrent());
+                              }
+                            }
+                          })
+                      .withSideInputs(readSessionView, tableSchemaView))
+              .setCoder(outputCoder);
+
+      return rows;
+    }
+
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index bf09bf4d9e3..938d131a0da 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
@@ -163,4 +165,13 @@ public interface BigQueryOptions
   Long getStorageWriteApiMaxRequestSize();
 
   void setStorageWriteApiMaxRequestSize(Long value);
+
+  @Experimental(Kind.UNSPECIFIED)
+  @Description(
+      "If set, BigQueryIO.Read will use the StreamBundle based"
+          + "implementation of the Read API Source")
+  @Default.Boolean(false)
+  Boolean getEnableBundling();
+
+  void setEnableBundling(Boolean value);
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
index 27b88dc3960..834409062cc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
@@ -100,7 +100,7 @@ abstract class BigQueryStorageSourceBase<T> extends 
BoundedSource<T> {
   }
 
   @Override
-  public List<BigQueryStorageStreamSource<T>> split(
+  public List<? extends BoundedSource<T>> split(
       long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     @Nullable Table targetTable = getTargetTable(bqOptions);
@@ -133,13 +133,18 @@ abstract class BigQueryStorageSourceBase<T> extends 
BoundedSource<T> {
       readSessionBuilder.setDataFormat(format);
     }
 
+    // Setting the  requested max stream count to 0, implies that the Read API 
backend will select
+    // an appropriate number of streams for the Session to produce reasonable 
throughput.
+    // This is required when using the Read API Source V2.
     int streamCount = 0;
-    if (desiredBundleSizeBytes > 0) {
-      long tableSizeBytes = (targetTable != null) ? targetTable.getNumBytes() 
: 0;
-      streamCount = (int) Math.min(tableSizeBytes / desiredBundleSizeBytes, 
MAX_SPLIT_COUNT);
-    }
+    if (!bqOptions.getEnableBundling()) {
+      if (desiredBundleSizeBytes > 0) {
+        long tableSizeBytes = (targetTable != null) ? 
targetTable.getNumBytes() : 0;
+        streamCount = (int) Math.min(tableSizeBytes / desiredBundleSizeBytes, 
MAX_SPLIT_COUNT);
+      }
 
-    streamCount = Math.max(streamCount, MIN_SPLIT_COUNT);
+      streamCount = Math.max(streamCount, MIN_SPLIT_COUNT);
+    }
 
     CreateReadSessionRequest createReadSessionRequest =
         CreateReadSessionRequest.newBuilder()
@@ -166,6 +171,25 @@ abstract class BigQueryStorageSourceBase<T> extends 
BoundedSource<T> {
       return ImmutableList.of();
     }
 
+    streamCount = readSession.getStreamsList().size();
+    int streamsPerBundle = 0;
+    double bytesPerStream = 0;
+    LOG.info(
+        "Estimated bytes this ReadSession will scan when all Streams are 
consumed: '{}'",
+        readSession.getEstimatedTotalBytesScanned());
+    if (bqOptions.getEnableBundling()) {
+      if (desiredBundleSizeBytes > 0) {
+        bytesPerStream =
+            (double) readSession.getEstimatedTotalBytesScanned() / 
readSession.getStreamsCount();
+        LOG.info("Estimated bytes each Stream will consume: '{}'", 
bytesPerStream);
+        streamsPerBundle = (int) Math.ceil(desiredBundleSizeBytes / 
bytesPerStream);
+      } else {
+        streamsPerBundle = (int) Math.ceil((double) streamCount / 10);
+      }
+      streamsPerBundle = Math.min(streamCount, streamsPerBundle);
+      LOG.info("Distributing '{}' Streams per StreamBundle.", 
streamsPerBundle);
+    }
+
     Schema sessionSchema;
     if (readSession.getDataFormat() == DataFormat.ARROW) {
       org.apache.arrow.vector.types.pojo.Schema schema =
@@ -180,18 +204,37 @@ abstract class BigQueryStorageSourceBase<T> extends 
BoundedSource<T> {
       throw new IllegalArgumentException(
           "data is not in a supported dataFormat: " + 
readSession.getDataFormat());
     }
-
+    int streamIndex = 0;
     Preconditions.checkStateNotNull(
         targetTable); // TODO: this is inconsistent with method above, where 
it can be null
     TableSchema trimmedSchema =
         BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), 
sessionSchema);
-    List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+    if (!bqOptions.getEnableBundling()) {
+      List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+      for (ReadStream readStream : readSession.getStreamsList()) {
+        sources.add(
+            BigQueryStorageStreamSource.create(
+                readSession, readStream, trimmedSchema, parseFn, outputCoder, 
bqServices));
+      }
+      return ImmutableList.copyOf(sources);
+    }
+    List<ReadStream> streamBundle = Lists.newArrayList();
+    List<BigQueryStorageStreamBundleSource<T>> sources = Lists.newArrayList();
     for (ReadStream readStream : readSession.getStreamsList()) {
+      streamIndex++;
+      streamBundle.add(readStream);
+      if (streamIndex % streamsPerBundle == 0) {
+        sources.add(
+            BigQueryStorageStreamBundleSource.create(
+                readSession, streamBundle, trimmedSchema, parseFn, 
outputCoder, bqServices, 1L));
+        streamBundle = Lists.newArrayList();
+      }
+    }
+    if (streamIndex % streamsPerBundle != 0) {
       sources.add(
-          BigQueryStorageStreamSource.create(
-              readSession, readStream, trimmedSchema, parseFn, outputCoder, 
bqServices));
+          BigQueryStorageStreamBundleSource.create(
+              readSession, streamBundle, trimmedSchema, parseFn, outputCoder, 
bqServices, 1L));
     }
-
     return ImmutableList.copyOf(sources);
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java
new file mode 100644
index 00000000000..42e99b6aae3
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.ReadStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.Preconditions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.beam.sdk.io.Source} representing a bundle of Streams in 
a BigQuery ReadAPI
+ * Session. This Source ONLY supports splitting at the StreamBundle level.
+ *
+ * <p>{@link BigQueryStorageStreamBundleSource} defines a split-point as the 
starting offset of each
+ * Stream. As a result, the number of valid split points in the Source is 
equal to the number of
+ * Streams in the StreamBundle and this Source does NOT support sub-Stream 
splitting.
+ *
+ * <p>Additionally, the underlying {@link 
org.apache.beam.sdk.io.range.OffsetRangeTracker} and
+ * {@link OffsetBasedSource} operate in the split point space and do NOT 
directly interact with the
+ * Streams constituting the StreamBundle. Consequently, fractional values used 
in
+ * `splitAtFraction()` are translated into StreamBundleIndices and the 
underlying RangeTracker
+ * handles the split operation by checking the validity of the split point. 
This has the following
+ * implications for the `splitAtFraction()` operation:
+ *
+ * <p>1. Fraction values that point to the "middle" of a Stream will be 
translated to the
+ * appropriate Stream boundary by the RangeTracker.
+ *
+ * <p>2. Once a Stream is being read from, the RangeTracker will only accept 
`splitAtFraction()`
+ * calls that point to StreamBundleIndices that are greater than the 
StreamBundleIndex of the
+ * current Stream
+ *
+ * @param <T> Type of records represented by the source.
+ * @see OffsetBasedSource
+ * @see org.apache.beam.sdk.io.range.OffsetRangeTracker
+ * @see org.apache.beam.sdk.io.BlockBasedSource (semantically similar to {@link
+ *     BigQueryStorageStreamBundleSource})
+ */
+class BigQueryStorageStreamBundleSource<T> extends OffsetBasedSource<T> {
+
+  public static <T> BigQueryStorageStreamBundleSource<T> create(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      TableSchema tableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        streamBundle,
+        toJsonString(Preconditions.checkArgumentNotNull(tableSchema, 
"tableSchema")),
+        parseFn,
+        outputCoder,
+        bqServices,
+        minBundleSize);
+  }
+
+  /**
+   * Creates a new source with the same properties as this one, except with a 
different {@link
+   * List<ReadStream>}.
+   */
+  public BigQueryStorageStreamBundleSource<T> fromExisting(List<ReadStream> 
newStreamBundle) {
+    return new BigQueryStorageStreamBundleSource<>(
+        readSession,
+        newStreamBundle,
+        jsonTableSchema,
+        parseFn,
+        outputCoder,
+        bqServices,
+        getMinBundleSize());
+  }
+
+  private final ReadSession readSession;
+  private final List<ReadStream> streamBundle;
+  private final String jsonTableSchema;
+  private final SerializableFunction<SchemaAndRecord, T> parseFn;
+  private final Coder<T> outputCoder;
+  private final BigQueryServices bqServices;
+
+  private BigQueryStorageStreamBundleSource(
+      ReadSession readSession,
+      List<ReadStream> streamBundle,
+      String jsonTableSchema,
+      SerializableFunction<SchemaAndRecord, T> parseFn,
+      Coder<T> outputCoder,
+      BigQueryServices bqServices,
+      long minBundleSize) {
+    super(0, streamBundle.size(), minBundleSize);
+    this.readSession = Preconditions.checkArgumentNotNull(readSession, 
"readSession");
+    this.streamBundle = Preconditions.checkArgumentNotNull(streamBundle, 
"streams");
+    this.jsonTableSchema = Preconditions.checkArgumentNotNull(jsonTableSchema, 
"jsonTableSchema");
+    this.parseFn = Preconditions.checkArgumentNotNull(parseFn, "parseFn");
+    this.outputCoder = Preconditions.checkArgumentNotNull(outputCoder, 
"outputCoder");
+    this.bqServices = Preconditions.checkArgumentNotNull(bqServices, 
"bqServices");
+  }
+
+  @Override
+  public Coder<T> getOutputCoder() {
+    return outputCoder;
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("table", 
readSession.getTable()).withLabel("Table"))
+        .add(DisplayData.item("readSession", 
readSession.getName()).withLabel("Read session"));
+    for (ReadStream readStream : streamBundle) {
+      builder.add(DisplayData.item("stream", 
readStream.getName()).withLabel("Stream"));
+    }
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) {
+    // The size of stream source can't be estimated due to server-side liquid 
sharding.
+    // TODO: Implement progress reporting.
+    return 0L;
+  }
+
+  @Override
+  public List<? extends OffsetBasedSource<T>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) {
+    // This method is only called for initial splits. Since this class will 
always be a child source
+    // of BigQueryStorageSourceBase, all splits here will be handled by 
`splitAtFraction()`. As a
+    // result, this is a no-op.
+    return ImmutableList.of(this);
+  }
+
+  @Override
+  public long getMaxEndOffset(PipelineOptions options) throws Exception {
+    return this.streamBundle.size();
+  }
+
+  @Override
+  public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
+    List<ReadStream> newStreamBundle = streamBundle.subList((int) start, (int) 
end);
+    return fromExisting(newStreamBundle);
+  }
+
+  @Override
+  public BigQueryStorageStreamBundleReader<T> createReader(PipelineOptions 
options)
+      throws IOException {
+    return new BigQueryStorageStreamBundleReader<>(this, 
options.as(BigQueryOptions.class));
+  }
+
+  public static class BigQueryStorageStreamBundleReader<T> extends 
OffsetBasedReader<T> {
+    private static final Logger LOG =
+        LoggerFactory.getLogger(BigQueryStorageStreamBundleReader.class);
+
+    private final BigQueryStorageReader reader;
+    private final SerializableFunction<SchemaAndRecord, T> parseFn;
+    private final StorageClient storageClient;
+    private final TableSchema tableSchema;
+
+    private BigQueryStorageStreamBundleSource<T> source;
+    private @Nullable BigQueryServerStream<ReadRowsResponse> responseStream = 
null;
+    private @Nullable Iterator<ReadRowsResponse> responseIterator = null;
+    private @Nullable T current = null;
+    private int currentStreamBundleIndex;
+    private long currentStreamOffset;
+
+    // Values used for progress reporting.
+    private double fractionOfStreamBundleConsumed;
+
+    private double progressAtResponseStart;
+    private double progressAtResponseEnd;
+    private long rowsConsumedFromCurrentResponse;
+    private long totalRowsInCurrentResponse;
+
+    private @Nullable TableReference tableReference;
+    private @Nullable ServiceCallMetric serviceCallMetric;
+
+    private BigQueryStorageStreamBundleReader(
+        BigQueryStorageStreamBundleSource<T> source, BigQueryOptions options) 
throws IOException {
+      super(source);
+      this.source = source;
+      this.reader = BigQueryStorageReaderFactory.getReader(source.readSession);
+      this.parseFn = source.parseFn;
+      this.storageClient = source.bqServices.getStorageClient(options);
+      this.tableSchema = fromJsonString(source.jsonTableSchema, 
TableSchema.class);
+      this.currentStreamBundleIndex = 0;
+      this.fractionOfStreamBundleConsumed = 0d;
+      this.progressAtResponseStart = 0d;
+      this.progressAtResponseEnd = 0d;
+      this.rowsConsumedFromCurrentResponse = 0L;
+      this.totalRowsInCurrentResponse = 0L;
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      if (current == null) {
+        throw new NoSuchElementException();
+      }
+      return current;
+    }
+
+    @Override
+    protected long getCurrentOffset() throws NoSuchElementException {
+      return currentStreamBundleIndex;
+    }
+
+    @Override
+    protected boolean isAtSplitPoint() throws NoSuchElementException {
+      if (currentStreamOffset == 0) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public boolean startImpl() throws IOException {
+      return readNextStream();
+    }
+
+    @Override
+    public boolean advanceImpl() throws IOException {
+      Preconditions.checkStateNotNull(responseIterator);
+      currentStreamOffset += totalRowsInCurrentResponse;
+      return readNextRecord();
+    }
+
+    private boolean readNextStream() throws IOException {
+      BigQueryStorageStreamBundleSource<T> source = getCurrentSource();
+      if (currentStreamBundleIndex == source.streamBundle.size()) {
+        fractionOfStreamBundleConsumed = 1d;
+        return false;
+      }
+      ReadRowsRequest request =
+          ReadRowsRequest.newBuilder()
+              
.setReadStream(source.streamBundle.get(currentStreamBundleIndex).getName())
+              .build();
+      tableReference = 
BigQueryUtils.toTableReference(source.readSession.getTable());
+      serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
+      LOG.info(
+          "Started BigQuery Storage API read from stream {}.",
+          source.streamBundle.get(currentStreamBundleIndex).getName());
+      responseStream = storageClient.readRows(request, 
source.readSession.getTable());
+      responseIterator = responseStream.iterator();
+      return readNextRecord();
+    }
+
+    @RequiresNonNull("responseIterator")
+    private boolean readNextRecord() throws IOException {
+      Iterator<ReadRowsResponse> responseIterator = this.responseIterator;
+      if (responseIterator == null) {
+        LOG.info("Received null responseIterator for stream {}", 
currentStreamBundleIndex);
+        return false;
+      }
+      while (reader.readyForNextReadResponse()) {
+        if (!responseIterator.hasNext()) {
+          synchronized (this) {
+            currentStreamOffset = 0;
+            currentStreamBundleIndex++;
+          }
+          return readNextStream();
+        }
+
+        ReadRowsResponse response;
+        try {
+          response = responseIterator.next();
+          // Since we don't have a direct hook to the underlying
+          // API call, record success every time we read a record successfully.
+          if (serviceCallMetric != null) {
+            serviceCallMetric.call("ok");
+          }
+        } catch (ApiException e) {
+          // Occasionally the iterator will fail and raise an exception.
+          // Capture it here and record the error in the metric.
+          if (serviceCallMetric != null) {
+            serviceCallMetric.call(e.getStatusCode().getCode().name());
+          }
+          throw e;
+        }
+
+        progressAtResponseStart = 
response.getStats().getProgress().getAtResponseStart();
+        progressAtResponseEnd = 
response.getStats().getProgress().getAtResponseEnd();
+        totalRowsInCurrentResponse = response.getRowCount();
+        rowsConsumedFromCurrentResponse = 0L;
+
+        checkArgument(
+            totalRowsInCurrentResponse >= 0,
+            "Row count from current response (%s) must be non-negative.",
+            totalRowsInCurrentResponse);
+
+        checkArgument(
+            0f <= progressAtResponseStart && progressAtResponseStart <= 1f,
+            "Progress at response start (%s) is not in the range [0.0, 1.0].",
+            progressAtResponseStart);
+
+        checkArgument(
+            0f <= progressAtResponseEnd && progressAtResponseEnd <= 1f,
+            "Progress at response end (%s) is not in the range [0.0, 1.0].",
+            progressAtResponseEnd);
+        reader.processReadRowsResponse(response);
+      }
+
+      SchemaAndRecord schemaAndRecord = new 
SchemaAndRecord(reader.readSingleRecord(), tableSchema);
+
+      current = parseFn.apply(schemaAndRecord);
+
+      // Calculates the fraction of the current stream that has been consumed. 
This value is
+      // calculated by interpolating between the fraction consumed value from 
the previous server
+      // response (or zero if we're consuming the first response) and the 
fractional value in the
+      // current response based on how many of the rows in the current 
response have been consumed.
+      rowsConsumedFromCurrentResponse++;
+
+      double fractionOfCurrentStreamConsumed =
+          progressAtResponseStart
+              + ((progressAtResponseEnd - progressAtResponseStart)
+                  * (rowsConsumedFromCurrentResponse * 1.0 / 
totalRowsInCurrentResponse));
+
+      // We now calculate the progress made over the entire StreamBundle by 
assuming that each
+      // Stream in the StreamBundle has approximately the same amount of data. 
Given this, merely
+      // counting the number of Streams that have been read and linearly 
interpolating with the
+      // progress made in the current Stream gives us the overall StreamBundle 
progress.
+      fractionOfStreamBundleConsumed =
+          (currentStreamBundleIndex + fractionOfCurrentStreamConsumed) / 
source.streamBundle.size();
+      return true;
+    }
+
+    @Override
+    public synchronized void close() {
+      // Because superclass cannot have preconditions around these variables, 
cannot use
+      // @RequiresNonNull
+      Preconditions.checkStateNotNull(storageClient);
+      Preconditions.checkStateNotNull(reader);
+      storageClient.close();
+      reader.close();
+    }
+
+    @Override
+    public synchronized BigQueryStorageStreamBundleSource<T> 
getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public synchronized Double getFractionConsumed() {
+      return fractionOfStreamBundleConsumed;
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadWithStreamBundleSourceTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadWithStreamBundleSourceTest.java
new file mode 100644
index 00000000000..fc1ccd3c891
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadWithStreamBundleSourceTest.java
@@ -0,0 +1,2156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static java.util.Arrays.asList;
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import com.google.api.services.bigquery.model.Streamingbuffer;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
+import com.google.cloud.bigquery.storage.v1.ArrowSchema;
+import com.google.cloud.bigquery.storage.v1.AvroRows;
+import com.google.cloud.bigquery.storage.v1.AvroSchema;
+import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
+import com.google.cloud.bigquery.storage.v1.DataFormat;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.ReadStream;
+import com.google.cloud.bigquery.storage.v1.StreamStats;
+import com.google.cloud.bigquery.storage.v1.StreamStats.Progress;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.channels.Channels;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.WriteChannel;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.util.Text;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
+import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageStreamBundleSource.BigQueryStorageStreamBundleReader;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import 
org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices.FakeBigQueryServerStream;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.transforms.Convert;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.model.Statement;
+
+/**
+ * Tests for {@link BigQueryIO#readTableRows() using {@link 
Method#DIRECT_READ}} AND {@link
+ * BigQueryOptions#setEnableBundling(Boolean)} (Boolean)} set to True.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryIOStorageReadWithStreamBundleSourceTest {
+
+  private transient PipelineOptions options;
+  private final transient TemporaryFolder testFolder = new TemporaryFolder();
+  private transient TestPipeline p;
+  private BufferAllocator allocator;
+
+  @Rule
+  public final transient TestRule folderThenPipeline =
+      new TestRule() {
+        @Override
+        public Statement apply(Statement base, Description description) {
+          // We need to set up the temporary folder, and then set up the 
TestPipeline based on the
+          // chosen folder. Unfortunately, since rule evaluation order is 
unspecified and unrelated
+          // to field order, and is separate from construction, that requires 
manually creating this
+          // TestRule.
+          Statement withPipeline =
+              new Statement() {
+                @Override
+                public void evaluate() throws Throwable {
+                  options = TestPipeline.testingPipelineOptions();
+                  options.as(BigQueryOptions.class).setProject("project-id");
+                  if (description.getAnnotations().stream()
+                      .anyMatch(a -> 
a.annotationType().equals(ProjectOverride.class))) {
+                    
options.as(BigQueryOptions.class).setBigQueryProject("bigquery-project-id");
+                  }
+                  options
+                      .as(BigQueryOptions.class)
+                      .setTempLocation(testFolder.getRoot().getAbsolutePath());
+                  options.as(BigQueryOptions.class).setEnableBundling(true);
+                  p = TestPipeline.fromOptions(options);
+                  p.apply(base, description).evaluate();
+                }
+              };
+          return testFolder.apply(withPipeline, description);
+        }
+      };
+
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  private final FakeDatasetService fakeDatasetService = new 
FakeDatasetService();
+
+  @Before
+  public void setUp() throws Exception {
+    FakeDatasetService.setUp();
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void teardown() {
+    allocator.close();
+  }
+
+  @Test
+  public void testBuildTableBasedSource() {
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table");
+    checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", 
"table");
+    assertTrue(typedRead.getValidate());
+  }
+
+  @Test
+  public void testBuildTableBasedSourceWithoutValidation() {
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table")
+            .withoutValidation();
+    checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", 
"table");
+    assertFalse(typedRead.getValidate());
+  }
+
+  @Test
+  public void testBuildTableBasedSourceWithDefaultProject() {
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("myDataset.myTable");
+    checkTypedReadTableObject(typedRead, null, "myDataset", "myTable");
+  }
+
+  @Test
+  public void testBuildTableBasedSourceWithTableReference() {
+    TableReference tableReference =
+        new TableReference()
+            .setProjectId("foo.com:project")
+            .setDatasetId("dataset")
+            .setTableId("table");
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from(tableReference);
+    checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", 
"table");
+  }
+
+  private void checkTypedReadTableObject(
+      TypedRead<?> typedRead, String project, String dataset, String table) {
+    assertEquals(project, typedRead.getTable().getProjectId());
+    assertEquals(dataset, typedRead.getTable().getDatasetId());
+    assertEquals(table, typedRead.getTable().getTableId());
+    assertNull(typedRead.getQuery());
+    assertEquals(Method.DIRECT_READ, typedRead.getMethod());
+  }
+
+  @Test
+  public void testBuildSourceWithTableAndFlatten() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Invalid BigQueryIO.Read: Specifies a table with a result flattening 
preference,"
+            + " which only applies to queries");
+    p.apply(
+        "ReadMyTable",
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table")
+            .withoutResultFlattening());
+    p.run();
+  }
+
+  @Test
+  public void testBuildSourceWithTableAndSqlDialect() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect 
preference,"
+            + " which only applies to queries");
+    p.apply(
+        "ReadMyTable",
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table")
+            .usingStandardSql());
+    p.run();
+  }
+
+  @Test
+  public void testDisplayData() {
+    String tableSpec = "foo.com:project:dataset.table";
+    BigQueryIO.TypedRead<TableRow> typedRead =
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .withSelectedFields(ImmutableList.of("foo", "bar"))
+            .withProjectionPushdownApplied()
+            .from(tableSpec);
+    DisplayData displayData = DisplayData.from(typedRead);
+    assertThat(displayData, hasDisplayItem("table", tableSpec));
+    assertThat(displayData, hasDisplayItem("selectedFields", "foo, bar"));
+    assertThat(displayData, hasDisplayItem("projectionPushdownApplied", true));
+  }
+
+  @Test
+  public void testName() {
+    assertEquals(
+        "BigQueryIO.TypedRead",
+        BigQueryIO.read(new TableRowParser())
+            .withCoder(TableRowJsonCoder.of())
+            .withMethod(Method.DIRECT_READ)
+            .from("foo.com:project:dataset.table")
+            .getName());
+  }
+
+  @Test
+  public void testCoderInference() {
+    // Lambdas erase too much type information -- use an anonymous class here.
+    SerializableFunction<SchemaAndRecord, KV<ByteString, ReadSession>> parseFn 
=
+        new SerializableFunction<SchemaAndRecord, KV<ByteString, 
ReadSession>>() {
+          @Override
+          public KV<ByteString, ReadSession> apply(SchemaAndRecord input) {
+            return null;
+          }
+        };
+
+    assertEquals(
+        KvCoder.of(ByteStringCoder.of(), ProtoCoder.of(ReadSession.class)),
+        BigQueryIO.read(parseFn).inferCoder(CoderRegistry.createDefault()));
+  }
+
+  @Test
+  public void testTableSourceEstimatedSize() throws Exception {
+    doTableSourceEstimatedSizeTest(false);
+  }
+
+  @Test
+  public void testTableSourceEstimatedSize_IgnoresStreamingBuffer() throws 
Exception {
+    doTableSourceEstimatedSizeTest(true);
+  }
+
+  private void doTableSourceEstimatedSizeTest(boolean useStreamingBuffer) 
throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new Table().setTableReference(tableRef).setNumBytes(100L);
+    if (useStreamingBuffer) {
+      table.setStreamingBuffer(new 
Streamingbuffer().setEstimatedBytes(BigInteger.TEN));
+    }
+
+    fakeDatasetService.createTable(table);
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            ValueProvider.StaticValueProvider.of(tableRef),
+            null,
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withDatasetService(fakeDatasetService));
+
+    assertEquals(100, tableSource.getEstimatedSizeBytes(options));
+  }
+
+  @Test
+  @ProjectOverride
+  public void testTableSourceEstimatedSize_WithBigQueryProject() throws 
Exception {
+    fakeDatasetService.createDataset("bigquery-project-id", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("bigquery-project-id:dataset.table");
+    Table table = new Table().setTableReference(tableRef).setNumBytes(100L);
+    fakeDatasetService.createTable(table);
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            
ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")),
+            null,
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withDatasetService(fakeDatasetService));
+
+    assertEquals(100, tableSource.getEstimatedSizeBytes(options));
+  }
+
+  @Test
+  public void testTableSourceEstimatedSize_WithDefaultProject() throws 
Exception {
+    fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("project-id:dataset.table");
+    Table table = new Table().setTableReference(tableRef).setNumBytes(100L);
+    fakeDatasetService.createTable(table);
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            
ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")),
+            null,
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withDatasetService(fakeDatasetService));
+
+    assertEquals(100, tableSource.getEstimatedSizeBytes(options));
+  }
+
+  private static final String AVRO_SCHEMA_STRING =
+      "{\"namespace\": \"example.avro\",\n"
+          + " \"type\": \"record\",\n"
+          + " \"name\": \"RowRecord\",\n"
+          + " \"fields\": [\n"
+          + "     {\"name\": \"name\", \"type\": \"string\"},\n"
+          + "     {\"name\": \"number\", \"type\": \"long\"}\n"
+          + " ]\n"
+          + "}";
+
+  private static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(AVRO_SCHEMA_STRING);
+
+  private static final String TRIMMED_AVRO_SCHEMA_STRING =
+      "{\"namespace\": \"example.avro\",\n"
+          + "\"type\": \"record\",\n"
+          + "\"name\": \"RowRecord\",\n"
+          + "\"fields\": [\n"
+          + "    {\"name\": \"name\", \"type\": \"string\"}\n"
+          + " ]\n"
+          + "}";
+
+  private static final Schema TRIMMED_AVRO_SCHEMA =
+      new Schema.Parser().parse(TRIMMED_AVRO_SCHEMA_STRING);
+
+  private static final TableSchema TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.of(
+                  new 
TableFieldSchema().setName("name").setType("STRING").setMode("REQUIRED"),
+                  new 
TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED")));
+
+  private static final org.apache.arrow.vector.types.pojo.Schema ARROW_SCHEMA =
+      new org.apache.arrow.vector.types.pojo.Schema(
+          asList(
+              field("name", new ArrowType.Utf8()), field("number", new 
ArrowType.Int(64, true))));
+
+  private void doTableSourceInitialSplitTest(long bundleSize, long tableSize, 
int streamCount)
+      throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+
+    Table table =
+        new 
Table().setTableReference(tableRef).setNumBytes(tableSize).setSchema(TABLE_SCHEMA);
+
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    
.setTable("projects/foo.com:project/datasets/dataset/tables/table"))
+            .setMaxStreamCount(0)
+            .build();
+
+    ReadSession.Builder builder =
+        ReadSession.newBuilder()
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+            .setDataFormat(DataFormat.AVRO)
+            .setEstimatedTotalBytesScanned(tableSize);
+    for (int i = 0; i < streamCount; i++) {
+      builder.addStreams(ReadStream.newBuilder().setName("stream-" + i));
+    }
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            ValueProvider.StaticValueProvider.of(tableRef),
+            null,
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices()
+                .withDatasetService(fakeDatasetService)
+                .withStorageClient(fakeStorageClient));
+
+    List<? extends BoundedSource<TableRow>> sources = 
tableSource.split(bundleSize, options);
+    // Each StreamBundle is expected to contain a single stream.
+    assertEquals(streamCount, sources.size());
+  }
+
+  @Test
+  public void testTableSourceInitialSplit() throws Exception {
+    doTableSourceInitialSplitTest(1024L, 1024L * 1024L, 1024);
+  }
+
+  @Test
+  public void testTableSourceInitialSplit_MinSplitCount() throws Exception {
+    doTableSourceInitialSplitTest(1024L, 1024L * 1024L, 10);
+  }
+
+  @Test
+  public void testTableSourceInitialSplit_MaxSplitCount() throws Exception {
+    doTableSourceInitialSplitTest(10L, 1024L * 1024L, 10_000);
+  }
+
+  @Test
+  public void 
testTableSourceInitialSplit_WithSelectedFieldsAndRowRestriction() throws 
Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+
+    Table table = new 
Table().setTableReference(tableRef).setNumBytes(200L).setSchema(TABLE_SCHEMA);
+
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
+                    .setReadOptions(
+                        ReadSession.TableReadOptions.newBuilder()
+                            .addSelectedFields("name")
+                            .setRowRestriction("number > 5")))
+            .setMaxStreamCount(0)
+            .build();
+
+    ReadSession.Builder builder =
+        ReadSession.newBuilder()
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING))
+            .setDataFormat(DataFormat.AVRO)
+            .setEstimatedTotalBytesScanned(100L);
+    for (int i = 0; i < 10; i++) {
+      builder.addStreams(ReadStream.newBuilder().setName("stream-" + i));
+    }
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            ValueProvider.StaticValueProvider.of(tableRef),
+            StaticValueProvider.of(Lists.newArrayList("name")),
+            StaticValueProvider.of("number > 5"),
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices()
+                .withDatasetService(fakeDatasetService)
+                .withStorageClient(fakeStorageClient));
+
+    List<? extends BoundedSource<TableRow>> sources = tableSource.split(20L, 
options);
+    assertEquals(5, sources.size());
+  }
+
+  @Test
+  public void testTableSourceInitialSplit_WithDefaultProject() throws 
Exception {
+    fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("project-id:dataset.table");
+
+    Table table =
+        new 
Table().setTableReference(tableRef).setNumBytes(1024L).setSchema(TABLE_SCHEMA);
+
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    
.setTable("projects/project-id/datasets/dataset/tables/table"))
+            .setMaxStreamCount(0)
+            .build();
+
+    ReadSession.Builder builder =
+        ReadSession.newBuilder()
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+            .setDataFormat(DataFormat.AVRO)
+            .setEstimatedTotalBytesScanned(1024L);
+    for (int i = 0; i < 50; i++) {
+      builder.addStreams(ReadStream.newBuilder().setName("stream-" + i));
+    }
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            
ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")),
+            null,
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices()
+                .withDatasetService(fakeDatasetService)
+                .withStorageClient(fakeStorageClient));
+
+    List<? extends BoundedSource<TableRow>> sources = tableSource.split(4096L, 
options);
+    // A single StreamBundle containing all the Streams.
+    assertEquals(1, sources.size());
+  }
+
+  @Test
+  public void testTableSourceInitialSplit_EmptyTable() throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+
+    Table table =
+        new Table()
+            .setTableReference(tableRef)
+            .setNumBytes(1024L * 1024L)
+            .setSchema(new TableSchema());
+
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    
.setTable("projects/foo.com:project/datasets/dataset/tables/table"))
+            .setMaxStreamCount(0)
+            .build();
+
+    ReadSession emptyReadSession = ReadSession.newBuilder().build();
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(emptyReadSession);
+
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            ValueProvider.StaticValueProvider.of(tableRef),
+            null,
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices()
+                .withDatasetService(fakeDatasetService)
+                .withStorageClient(fakeStorageClient));
+
+    List<? extends BoundedSource<TableRow>> sources = tableSource.split(1024L, 
options);
+    assertTrue(sources.isEmpty());
+  }
+
+  @Test
+  public void testTableSourceCreateReader() throws Exception {
+    BigQueryStorageTableSource<TableRow> tableSource =
+        BigQueryStorageTableSource.create(
+            ValueProvider.StaticValueProvider.of(
+                
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table")),
+            null,
+            null,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withDatasetService(fakeDatasetService));
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("BigQuery storage source must be split before 
reading");
+    tableSource.createReader(options);
+  }
+
+  private static GenericRecord createRecord(String name, Schema schema) {
+    GenericRecord genericRecord = new Record(schema);
+    genericRecord.put("name", name);
+    return genericRecord;
+  }
+
+  private static GenericRecord createRecord(String name, long number, Schema 
schema) {
+    GenericRecord genericRecord = new Record(schema);
+    genericRecord.put("name", name);
+    genericRecord.put("number", number);
+    return genericRecord;
+  }
+
+  private static ByteString serializeArrowSchema(
+      org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
+    ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
+    try {
+      MessageSerializer.serialize(
+          new WriteChannel(Channels.newChannel(byteOutputStream)), 
arrowSchema);
+    } catch (IOException ex) {
+      throw new RuntimeException("Failed to serialize arrow schema.", ex);
+    }
+    return ByteString.copyFrom(byteOutputStream.toByteArray());
+  }
+
+  private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
+
+  private static ReadRowsResponse createResponse(
+      Schema schema,
+      Collection<GenericRecord> genericRecords,
+      double progressAtResponseStart,
+      double progressAtResponseEnd)
+      throws Exception {
+    GenericDatumWriter<GenericRecord> writer = new 
GenericDatumWriter<>(schema);
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    Encoder binaryEncoder = ENCODER_FACTORY.binaryEncoder(outputStream, null);
+    for (GenericRecord genericRecord : genericRecords) {
+      writer.write(genericRecord, binaryEncoder);
+    }
+
+    binaryEncoder.flush();
+
+    return ReadRowsResponse.newBuilder()
+        .setAvroRows(
+            AvroRows.newBuilder()
+                
.setSerializedBinaryRows(ByteString.copyFrom(outputStream.toByteArray()))
+                .setRowCount(genericRecords.size()))
+        .setRowCount(genericRecords.size())
+        .setStats(
+            StreamStats.newBuilder()
+                .setProgress(
+                    Progress.newBuilder()
+                        .setAtResponseStart(progressAtResponseStart)
+                        .setAtResponseEnd(progressAtResponseEnd)))
+        .build();
+  }
+
+  private ReadRowsResponse createResponseArrow(
+      org.apache.arrow.vector.types.pojo.Schema arrowSchema,
+      List<String> name,
+      List<Long> number,
+      double progressAtResponseStart,
+      double progressAtResponseEnd) {
+    ArrowRecordBatch serializedRecord;
+    try (VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(arrowSchema, 
allocator)) {
+      schemaRoot.allocateNew();
+      schemaRoot.setRowCount(name.size());
+      VarCharVector strVector = (VarCharVector) 
schemaRoot.getFieldVectors().get(0);
+      BigIntVector bigIntVector = (BigIntVector) 
schemaRoot.getFieldVectors().get(1);
+      for (int i = 0; i < name.size(); i++) {
+        bigIntVector.set(i, number.get(i));
+        strVector.set(i, new Text(name.get(i)));
+      }
+
+      VectorUnloader unLoader = new VectorUnloader(schemaRoot);
+      try (org.apache.arrow.vector.ipc.message.ArrowRecordBatch records =
+          unLoader.getRecordBatch()) {
+        try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+          MessageSerializer.serialize(new 
WriteChannel(Channels.newChannel(os)), records);
+          serializedRecord =
+              ArrowRecordBatch.newBuilder()
+                  .setRowCount(records.getLength())
+                  
.setSerializedRecordBatch(ByteString.copyFrom(os.toByteArray()))
+                  .build();
+        } catch (IOException e) {
+          throw new RuntimeException("Error writing to byte array output 
stream", e);
+        }
+      }
+    }
+
+    return ReadRowsResponse.newBuilder()
+        .setArrowRecordBatch(serializedRecord)
+        .setRowCount(name.size())
+        .setStats(
+            StreamStats.newBuilder()
+                .setProgress(
+                    Progress.newBuilder()
+                        .setAtResponseStart(progressAtResponseStart)
+                        .setAtResponseEnd(progressAtResponseEnd)))
+        .build();
+  }
+
+  @Test
+  public void testStreamSourceEstimatedSizeBytes() throws Exception {
+    List<ReadStream> streamBundle = 
Lists.newArrayList(ReadStream.getDefaultInstance());
+    BigQueryStorageStreamBundleSource<TableRow> streamSource =
+        BigQueryStorageStreamBundleSource.create(
+            ReadSession.getDefaultInstance(),
+            streamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices(),
+            1L);
+
+    assertEquals(0, streamSource.getEstimatedSizeBytes(options));
+  }
+
+  @Test
+  public void testStreamSourceSplit() throws Exception {
+    List<ReadStream> streamBundle = 
Lists.newArrayList(ReadStream.getDefaultInstance());
+    BigQueryStorageStreamBundleSource<TableRow> streamSource =
+        BigQueryStorageStreamBundleSource.create(
+            ReadSession.getDefaultInstance(),
+            streamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices(),
+            1L);
+
+    assertThat(streamSource.split(0, options), 
containsInAnyOrder(streamSource));
+  }
+
+  @Test
+  public void testReadFromStreamSource() throws Exception {
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSession")
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+            .build();
+
+    ReadRowsRequest expectedRequestOne =
+        
ReadRowsRequest.newBuilder().setReadStream("readStream1").setOffset(0).build();
+    ReadRowsRequest expectedRequestTwo =
+        
ReadRowsRequest.newBuilder().setReadStream("readStream2").setOffset(0).build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", 1, AVRO_SCHEMA),
+            createRecord("B", 2, AVRO_SCHEMA),
+            createRecord("C", 3, AVRO_SCHEMA),
+            createRecord("D", 4, AVRO_SCHEMA),
+            createRecord("E", 5, AVRO_SCHEMA),
+            createRecord("F", 6, AVRO_SCHEMA));
+
+    List<ReadRowsResponse> responsesOne =
+        Lists.newArrayList(
+            createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.50),
+            createResponse(AVRO_SCHEMA, records.subList(2, 3), 0.5, 0.75));
+    List<ReadRowsResponse> responsesTwo =
+        Lists.newArrayList(
+            createResponse(AVRO_SCHEMA, records.subList(3, 5), 0.0, 0.50),
+            createResponse(AVRO_SCHEMA, records.subList(5, 6), 0.5, 0.75));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(expectedRequestOne, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(responsesOne));
+    when(fakeStorageClient.readRows(expectedRequestTwo, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(responsesTwo));
+
+    List<ReadStream> streamBundle =
+        Lists.newArrayList(
+            ReadStream.newBuilder().setName("readStream1").build(),
+            ReadStream.newBuilder().setName("readStream2").build());
+    BigQueryStorageStreamBundleSource<TableRow> streamSource =
+        BigQueryStorageStreamBundleSource.create(
+            readSession,
+            streamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient),
+            1L);
+
+    List<TableRow> rows = new ArrayList<>();
+    BigQueryStorageStreamBundleReader<TableRow> reader = 
streamSource.createReader(options);
+    for (boolean hasNext = reader.start(); hasNext; hasNext = 
reader.advance()) {
+      rows.add(reader.getCurrent());
+    }
+
+    System.out.println("Rows: " + rows);
+
+    assertEquals(6, rows.size());
+  }
+
+  private static final double DELTA = 1e-6;
+
+  @Test
+  public void testFractionConsumedWithOneStreamInBundle() throws Exception {
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSession")
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+            .build();
+
+    ReadRowsRequest expectedRequest =
+        ReadRowsRequest.newBuilder().setReadStream("readStream").build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", 1, AVRO_SCHEMA),
+            createRecord("B", 2, AVRO_SCHEMA),
+            createRecord("C", 3, AVRO_SCHEMA),
+            createRecord("D", 4, AVRO_SCHEMA),
+            createRecord("E", 5, AVRO_SCHEMA),
+            createRecord("F", 6, AVRO_SCHEMA),
+            createRecord("G", 7, AVRO_SCHEMA));
+
+    List<ReadRowsResponse> responses =
+        Lists.newArrayList(
+            createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.25),
+            // Some responses may contain zero results, so we must ensure that 
we can are resilient
+            // to such responses.
+            createResponse(AVRO_SCHEMA, Lists.newArrayList(), 0.25, 0.25),
+            createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.3, 0.5),
+            createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.7, 1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(expectedRequest, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses));
+
+    List<ReadStream> streamBundle =
+        
Lists.newArrayList(ReadStream.newBuilder().setName("readStream").build());
+    BigQueryStorageStreamBundleSource<TableRow> streamSource =
+        BigQueryStorageStreamBundleSource.create(
+            readSession,
+            streamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient),
+            1L);
+
+    BoundedReader<TableRow> reader = streamSource.createReader(options);
+
+    // Before call to BoundedReader#start, fraction consumed must be zero.
+    assertEquals(0.0, reader.getFractionConsumed(), DELTA);
+
+    assertTrue(reader.start()); // Reads A.
+    assertEquals(0.125, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads B.
+    assertEquals(0.25, reader.getFractionConsumed(), DELTA);
+
+    assertTrue(reader.advance()); // Reads C.
+    assertEquals(0.4, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads D.
+    assertEquals(0.5, reader.getFractionConsumed(), DELTA);
+
+    assertTrue(reader.advance()); // Reads E.
+    assertEquals(0.8, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads F.
+    assertEquals(0.9, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads G.
+    assertEquals(1.0, reader.getFractionConsumed(), DELTA);
+
+    assertFalse(reader.advance()); // Reaches the end.
+
+    // We are done with the stream, so we should report 100% consumption.
+    assertEquals(Double.valueOf(1.0), reader.getFractionConsumed());
+  }
+
+  @Test
+  public void testFractionConsumedWithMultipleStreamsInBundle() throws 
Exception {
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSession")
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+            .build();
+
+    ReadRowsRequest expectedRequestOne =
+        ReadRowsRequest.newBuilder().setReadStream("readStream1").build();
+    ReadRowsRequest expectedRequestTwo =
+        ReadRowsRequest.newBuilder().setReadStream("readStream2").build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", 1, AVRO_SCHEMA),
+            createRecord("B", 2, AVRO_SCHEMA),
+            createRecord("C", 3, AVRO_SCHEMA),
+            createRecord("D", 4, AVRO_SCHEMA),
+            createRecord("E", 5, AVRO_SCHEMA),
+            createRecord("F", 6, AVRO_SCHEMA),
+            createRecord("G", 7, AVRO_SCHEMA));
+
+    List<ReadRowsResponse> responsesOne =
+        Lists.newArrayList(
+            createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.5),
+            // Some responses may contain zero results, so we must ensure that 
we are resilient
+            // to such responses.
+            createResponse(AVRO_SCHEMA, Lists.newArrayList(), 0.5, 0.5),
+            createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.5, 1.0));
+
+    List<ReadRowsResponse> responsesTwo =
+        Lists.newArrayList(createResponse(AVRO_SCHEMA, records.subList(4, 7), 
0.0, 1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(expectedRequestOne, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(responsesOne));
+    when(fakeStorageClient.readRows(expectedRequestTwo, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(responsesTwo));
+
+    List<ReadStream> streamBundle =
+        Lists.newArrayList(
+            ReadStream.newBuilder().setName("readStream1").build(),
+            ReadStream.newBuilder().setName("readStream2").build());
+
+    BigQueryStorageStreamBundleSource<TableRow> streamSource =
+        BigQueryStorageStreamBundleSource.create(
+            readSession,
+            streamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient),
+            1L);
+
+    BoundedReader<TableRow> reader = streamSource.createReader(options);
+
+    // Before call to BoundedReader#start, fraction consumed must be zero.
+    assertEquals(0.0, reader.getFractionConsumed(), DELTA);
+
+    assertTrue(reader.start()); // Reads A.
+    assertEquals(0.125, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads B.
+    assertEquals(0.25, reader.getFractionConsumed(), DELTA);
+
+    assertTrue(reader.advance()); // Reads C.
+    assertEquals(0.375, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads D.
+    assertEquals(0.5, reader.getFractionConsumed(), DELTA);
+
+    assertTrue(reader.advance()); // Reads E.
+    assertEquals(0.6666666666666666, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads F.
+    assertEquals(0.8333333333333333, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads G.
+    assertEquals(1.0, reader.getFractionConsumed(), DELTA);
+
+    assertFalse(reader.advance()); // Reaches the end.
+
+    // We are done with the streams, so we should report 100% consumption.
+    assertEquals(Double.valueOf(1.0), reader.getFractionConsumed());
+  }
+
+  @Test
+  public void testStreamSourceSplitAtFractionNoOpWithOneStreamInBundle() 
throws Exception {
+    List<ReadRowsResponse> responses =
+        Lists.newArrayList(
+            createResponse(
+                AVRO_SCHEMA,
+                Lists.newArrayList(
+                    createRecord("A", 1, AVRO_SCHEMA), createRecord("B", 2, 
AVRO_SCHEMA)),
+                0.0,
+                0.25),
+            createResponse(
+                AVRO_SCHEMA, Lists.newArrayList(createRecord("C", 3, 
AVRO_SCHEMA)), 0.25, 0.50),
+            createResponse(
+                AVRO_SCHEMA,
+                Lists.newArrayList(
+                    createRecord("D", 4, AVRO_SCHEMA), createRecord("E", 5, 
AVRO_SCHEMA)),
+                0.50,
+                0.75));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(
+            
ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses));
+
+    List<ReadStream> parentStreamBundle =
+        
Lists.newArrayList(ReadStream.newBuilder().setName("parentStream").build());
+    BigQueryStorageStreamBundleSource<TableRow> streamBundleSource =
+        BigQueryStorageStreamBundleSource.create(
+            ReadSession.newBuilder()
+                .setName("readSession")
+                
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+                .build(),
+            parentStreamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient),
+            1L);
+
+    // Read a few records from the parent stream and ensure that records are 
returned in the
+    // prescribed order.
+    BoundedReader<TableRow> primary = streamBundleSource.createReader(options);
+    assertTrue(primary.start());
+    assertEquals("A", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("B", primary.getCurrent().get("name"));
+
+    // Now split the stream. Since we do NOT split below the granularity of a 
single stream,
+    // this will be a No-Op and the primary source should be read to 
completion.
+    BoundedSource<TableRow> secondary = primary.splitAtFraction(0.5);
+    assertNull(secondary);
+
+    assertTrue(primary.advance());
+    assertEquals("C", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("D", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("E", primary.getCurrent().get("name"));
+    assertFalse(primary.advance());
+  }
+
+  @Test
+  public void testStreamSourceSplitAtFractionWithMultipleStreamsInBundle() 
throws Exception {
+    List<ReadRowsResponse> responses =
+        Lists.newArrayList(
+            createResponse(
+                AVRO_SCHEMA,
+                Lists.newArrayList(
+                    createRecord("A", 1, AVRO_SCHEMA), createRecord("B", 2, 
AVRO_SCHEMA)),
+                0.0,
+                0.6),
+            createResponse(
+                AVRO_SCHEMA, Lists.newArrayList(createRecord("C", 3, 
AVRO_SCHEMA)), 0.6, 1.0),
+            createResponse(
+                AVRO_SCHEMA,
+                Lists.newArrayList(
+                    createRecord("D", 4, AVRO_SCHEMA),
+                    createRecord("E", 5, AVRO_SCHEMA),
+                    createRecord("F", 6, AVRO_SCHEMA)),
+                0.0,
+                1.0),
+            createResponse(
+                AVRO_SCHEMA, Lists.newArrayList(createRecord("G", 7, 
AVRO_SCHEMA)), 0.0, 1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream1").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(0, 2)));
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream2").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(2, 3)));
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream3").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(3, 4)));
+
+    List<ReadStream> primaryStreamBundle =
+        Lists.newArrayList(
+            ReadStream.newBuilder().setName("readStream1").build(),
+            ReadStream.newBuilder().setName("readStream2").build(),
+            ReadStream.newBuilder().setName("readStream3").build());
+
+    BigQueryStorageStreamBundleSource<TableRow> primarySource =
+        BigQueryStorageStreamBundleSource.create(
+            ReadSession.newBuilder()
+                .setName("readSession")
+                
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+                .build(),
+            primaryStreamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient),
+            1L);
+
+    // Read a few records from the primary Source and ensure that records are 
returned in the
+    // prescribed order.
+    BoundedReader<TableRow> primary = primarySource.createReader(options);
+
+    assertTrue(primary.start());
+
+    // Attempting to split at a sub-Stream level which is NOT supported by the
+    // `BigQueryStorageStreamBundleSource`. IOTW, since there are exactly 3 
Streams in the Source,
+    // a split will only occur for fraction > 0.33.
+    BoundedSource<TableRow> secondarySource = primary.splitAtFraction(0.05);
+    assertNull(secondarySource);
+
+    assertEquals("A", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("B", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("C", primary.getCurrent().get("name"));
+
+    // Now split the primary Source, and ensure that the returned source 
points to a non-null
+    // StreamBundle containing Streams 2 & 3.
+    secondarySource = primary.splitAtFraction(0.5);
+    assertNotNull(secondarySource);
+    BoundedReader<TableRow> secondary = secondarySource.createReader(options);
+
+    // Since the last two streams were split out the Primary source has been 
exhausted.
+    assertFalse(primary.advance());
+
+    assertTrue(secondary.start());
+    assertEquals("D", secondary.getCurrent().get("name"));
+    assertTrue(secondary.advance());
+    assertEquals("E", secondary.getCurrent().get("name"));
+    assertTrue(secondary.advance());
+    assertEquals("F", secondary.getCurrent().get("name"));
+    assertTrue((secondary.advance()));
+
+    // Since we have already started reading from the last Stream in the 
StreamBundle, splitting
+    // is now a no-op.
+    BoundedSource<TableRow> tertiarySource = secondary.splitAtFraction(0.55);
+    assertNull(tertiarySource);
+
+    assertEquals("G", secondary.getCurrent().get("name"));
+    assertFalse((secondary.advance()));
+  }
+
+  @Test
+  public void 
testStreamSourceSplitAtFractionRepeatedWithMultipleStreamInBundle() throws 
Exception {
+    List<ReadRowsResponse> responses =
+        Lists.newArrayList(
+            createResponse(
+                AVRO_SCHEMA,
+                Lists.newArrayList(
+                    createRecord("A", 1, AVRO_SCHEMA), createRecord("B", 2, 
AVRO_SCHEMA)),
+                0.0,
+                0.6),
+            createResponse(
+                AVRO_SCHEMA, Lists.newArrayList(createRecord("C", 3, 
AVRO_SCHEMA)), 0.6, 1.0),
+            createResponse(
+                AVRO_SCHEMA,
+                Lists.newArrayList(
+                    createRecord("D", 4, AVRO_SCHEMA),
+                    createRecord("E", 5, AVRO_SCHEMA),
+                    createRecord("F", 6, AVRO_SCHEMA)),
+                0.0,
+                1.0),
+            createResponse(
+                AVRO_SCHEMA, Lists.newArrayList(createRecord("G", 7, 
AVRO_SCHEMA)), 0.0, 1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream1").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(0, 2)));
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream2").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(2, 3)));
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream3").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(3, 4)));
+
+    List<ReadStream> primaryStreamBundle =
+        Lists.newArrayList(
+            ReadStream.newBuilder().setName("readStream1").build(),
+            ReadStream.newBuilder().setName("readStream2").build(),
+            ReadStream.newBuilder().setName("readStream3").build());
+
+    BigQueryStorageStreamBundleSource<TableRow> primarySource =
+        BigQueryStorageStreamBundleSource.create(
+            ReadSession.newBuilder()
+                .setName("readSession")
+                
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+                .build(),
+            primaryStreamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient),
+            1L);
+
+    // Read a few records from the primary Source and ensure that records are 
returned in the
+    // prescribed order.
+    BoundedReader<TableRow> primary = primarySource.createReader(options);
+
+    assertTrue(primary.start());
+    assertEquals("A", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("B", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("C", primary.getCurrent().get("name"));
+
+    // Now split the primary Source, and ensure that the returned source 
points to a non-null
+    // StreamBundle containing ONLY Stream 3. Since there are exactly 3 
Streams in the Source,
+    // a split will only occur for fraction > 0.33.
+    BoundedSource<TableRow> secondarySource = primary.splitAtFraction(0.7);
+    assertNotNull(secondarySource);
+    BoundedReader<TableRow> secondary = secondarySource.createReader(options);
+    assertTrue(secondary.start());
+    assertEquals("G", secondary.getCurrent().get("name"));
+    assertFalse((secondary.advance()));
+
+    // A second splitAtFraction() call on the primary source. The resulting 
source should
+    // contain a StreamBundle containing ONLY Stream 2. Since there are 2 
Streams in the Source,
+    // a split will only occur for fraction > 0.50.
+    BoundedSource<TableRow> tertiarySource = primary.splitAtFraction(0.55);
+    assertNotNull(tertiarySource);
+    BoundedReader<TableRow> tertiary = tertiarySource.createReader(options);
+    assertTrue(tertiary.start());
+    assertEquals("D", tertiary.getCurrent().get("name"));
+    assertTrue(tertiary.advance());
+    assertEquals("E", tertiary.getCurrent().get("name"));
+    assertTrue(tertiary.advance());
+    assertEquals("F", tertiary.getCurrent().get("name"));
+    assertFalse(tertiary.advance());
+
+    // A third attempt to split the primary source. This will be ignored since 
the primary source
+    // since the Source contains only a single stream now and 
`BigQueryStorageStreamBundleSource`
+    // does NOT support sub-stream splitting.
+    tertiarySource = primary.splitAtFraction(0.9);
+    assertNull(tertiarySource);
+
+    // All the rows in the primary Source have been read.
+    assertFalse(primary.advance());
+  }
+
+  @Test
+  public void testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPoint() 
throws Exception {
+    List<ReadRowsResponse> responses =
+        Lists.newArrayList(
+            createResponse(
+                AVRO_SCHEMA,
+                Lists.newArrayList(
+                    createRecord("A", 1, AVRO_SCHEMA), createRecord("B", 2, 
AVRO_SCHEMA)),
+                0.0,
+                0.66),
+            createResponse(
+                AVRO_SCHEMA, Lists.newArrayList(createRecord("C", 3, 
AVRO_SCHEMA)), 0.66, 1.0),
+            createResponse(
+                AVRO_SCHEMA,
+                Lists.newArrayList(
+                    createRecord("D", 4, AVRO_SCHEMA), createRecord("E", 5, 
AVRO_SCHEMA)),
+                0.0,
+                1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream1").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(0, 2)));
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream2").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(2, 3)));
+
+    List<ReadStream> parentStreamBundle =
+        Lists.newArrayList(
+            ReadStream.newBuilder().setName("readStream1").build(),
+            ReadStream.newBuilder().setName("readStream2").build());
+
+    BigQueryStorageStreamBundleSource<TableRow> streamBundleSource =
+        BigQueryStorageStreamBundleSource.create(
+            ReadSession.newBuilder()
+                .setName("readSession")
+                
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+                .build(),
+            parentStreamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient),
+            1L);
+
+    // Read a few records from the parent bundle and ensure the records are 
returned in
+    // the prescribed order.
+    BoundedReader<TableRow> primary = streamBundleSource.createReader(options);
+    assertTrue(primary.start());
+    assertEquals("A", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("B", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("C", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("D", primary.getCurrent().get("name"));
+
+    // We attempt to split the StreamBundle after starting to read the 
contents of the second
+    // stream.
+    BoundedSource<TableRow> secondarySource = primary.splitAtFraction(0.5);
+    assertNull(secondarySource);
+
+    assertTrue(primary.advance());
+    assertEquals("E", primary.getCurrent().get("name"));
+    assertFalse(primary.advance());
+  }
+
+  private static final class ParseKeyValue
+      implements SerializableFunction<SchemaAndRecord, KV<String, Long>> {
+
+    @Override
+    public KV<String, Long> apply(SchemaAndRecord input) {
+      return KV.of(
+          input.getRecord().get("name").toString(), (Long) 
input.getRecord().get("number"));
+    }
+  }
+
+  @Test
+  public void testReadFromBigQueryIO() throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new 
Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedCreateReadSessionRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
+                    .setDataFormat(DataFormat.AVRO))
+            .setMaxStreamCount(0)
+            .build();
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSessionName")
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
+            .addStreams(ReadStream.newBuilder().setName("streamName1"))
+            .addStreams(ReadStream.newBuilder().setName("streamName2"))
+            .setDataFormat(DataFormat.AVRO)
+            .setEstimatedTotalBytesScanned(10L)
+            .build();
+
+    ReadRowsRequest expectedReadRowsRequestOne =
+        ReadRowsRequest.newBuilder().setReadStream("streamName1").build();
+    ReadRowsRequest expectedReadRowsRequestTwo =
+        ReadRowsRequest.newBuilder().setReadStream("streamName2").build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", 1, AVRO_SCHEMA),
+            createRecord("B", 2, AVRO_SCHEMA),
+            createRecord("C", 3, AVRO_SCHEMA),
+            createRecord("D", 4, AVRO_SCHEMA),
+            createRecord("E", 5, AVRO_SCHEMA),
+            createRecord("F", 6, AVRO_SCHEMA),
+            createRecord("G", 7, AVRO_SCHEMA));
+
+    List<ReadRowsResponse> readRowsResponsesOne =
+        Lists.newArrayList(
+            createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.50),
+            createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.5, 1.0));
+    List<ReadRowsResponse> readRowsResponsesTwo =
+        Lists.newArrayList(
+            createResponse(AVRO_SCHEMA, records.subList(4, 5), 0.0, 0.33),
+            createResponse(AVRO_SCHEMA, records.subList(5, 7), 0.33, 1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class, 
withSettings().serializable());
+    when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
+        .thenReturn(readSession);
+    when(fakeStorageClient.readRows(expectedReadRowsRequestOne, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponsesOne));
+    when(fakeStorageClient.readRows(expectedReadRowsRequestTwo, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponsesTwo));
+
+    PCollection<KV<String, Long>> output =
+        p.apply(
+            BigQueryIO.read(new ParseKeyValue())
+                .from("foo.com:project:dataset.table")
+                .withMethod(Method.DIRECT_READ)
+                .withFormat(DataFormat.AVRO)
+                .withTestServices(
+                    new FakeBigQueryServices()
+                        .withDatasetService(fakeDatasetService)
+                        .withStorageClient(fakeStorageClient)));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            ImmutableList.of(
+                KV.of("A", 1L),
+                KV.of("B", 2L),
+                KV.of("C", 3L),
+                KV.of("D", 4L),
+                KV.of("E", 5L),
+                KV.of("F", 6L),
+                KV.of("G", 7L)));
+
+    p.run();
+  }
+
+  @Test
+  public void testReadFromBigQueryIOWithTrimmedSchema() throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new 
Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedCreateReadSessionRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
+                    .setReadOptions(
+                        
ReadSession.TableReadOptions.newBuilder().addSelectedFields("name"))
+                    .setDataFormat(DataFormat.AVRO))
+            .setMaxStreamCount(0)
+            .build();
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSessionName")
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING))
+            .addStreams(ReadStream.newBuilder().setName("streamName1"))
+            .addStreams(ReadStream.newBuilder().setName("streamName2"))
+            .setDataFormat(DataFormat.AVRO)
+            .build();
+
+    ReadRowsRequest expectedReadRowsRequestOne =
+        ReadRowsRequest.newBuilder().setReadStream("streamName1").build();
+    ReadRowsRequest expectedReadRowsRequestTwo =
+        ReadRowsRequest.newBuilder().setReadStream("streamName2").build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", TRIMMED_AVRO_SCHEMA),
+            createRecord("B", TRIMMED_AVRO_SCHEMA),
+            createRecord("C", TRIMMED_AVRO_SCHEMA),
+            createRecord("D", TRIMMED_AVRO_SCHEMA),
+            createRecord("E", TRIMMED_AVRO_SCHEMA),
+            createRecord("F", TRIMMED_AVRO_SCHEMA),
+            createRecord("G", TRIMMED_AVRO_SCHEMA));
+
+    List<ReadRowsResponse> readRowsResponsesOne =
+        Lists.newArrayList(
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(0, 2), 0.0, 
0.50),
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(2, 4), 0.5, 
0.75));
+    List<ReadRowsResponse> readRowsResponsesTwo =
+        Lists.newArrayList(
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(4, 5), 0.0, 
0.33),
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(5, 7), 0.33, 
1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class, 
withSettings().serializable());
+    when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
+        .thenReturn(readSession);
+    when(fakeStorageClient.readRows(expectedReadRowsRequestOne, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponsesOne));
+    when(fakeStorageClient.readRows(expectedReadRowsRequestTwo, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponsesTwo));
+
+    PCollection<TableRow> output =
+        p.apply(
+            BigQueryIO.readTableRows()
+                .from("foo.com:project:dataset.table")
+                .withMethod(Method.DIRECT_READ)
+                .withSelectedFields(Lists.newArrayList("name"))
+                .withFormat(DataFormat.AVRO)
+                .withTestServices(
+                    new FakeBigQueryServices()
+                        .withDatasetService(fakeDatasetService)
+                        .withStorageClient(fakeStorageClient)));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            ImmutableList.of(
+                new TableRow().set("name", "A"),
+                new TableRow().set("name", "B"),
+                new TableRow().set("name", "C"),
+                new TableRow().set("name", "D"),
+                new TableRow().set("name", "E"),
+                new TableRow().set("name", "F"),
+                new TableRow().set("name", "G")));
+
+    p.run();
+  }
+
+  @Test
+  public void testReadFromBigQueryIOWithBeamSchema() throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new 
Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedCreateReadSessionRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
+                    .setReadOptions(
+                        
ReadSession.TableReadOptions.newBuilder().addSelectedFields("name"))
+                    .setDataFormat(DataFormat.AVRO))
+            .setMaxStreamCount(0)
+            .build();
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSessionName")
+            
.setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING))
+            .addStreams(ReadStream.newBuilder().setName("streamName1"))
+            .addStreams(ReadStream.newBuilder().setName("streamName2"))
+            .setDataFormat(DataFormat.AVRO)
+            .build();
+
+    ReadRowsRequest expectedReadRowsRequestOne =
+        ReadRowsRequest.newBuilder().setReadStream("streamName1").build();
+    ReadRowsRequest expectedReadRowsRequestTwo =
+        ReadRowsRequest.newBuilder().setReadStream("streamName2").build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", TRIMMED_AVRO_SCHEMA),
+            createRecord("B", TRIMMED_AVRO_SCHEMA),
+            createRecord("C", TRIMMED_AVRO_SCHEMA),
+            createRecord("D", TRIMMED_AVRO_SCHEMA),
+            createRecord("E", TRIMMED_AVRO_SCHEMA),
+            createRecord("F", TRIMMED_AVRO_SCHEMA),
+            createRecord("G", TRIMMED_AVRO_SCHEMA));
+
+    List<ReadRowsResponse> readRowsResponsesOne =
+        Lists.newArrayList(
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(0, 2), 0.0, 
0.50),
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(2, 4), 0.5, 
0.75));
+    List<ReadRowsResponse> readRowsResponsesTwo =
+        Lists.newArrayList(
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(4, 5), 0.0, 
0.33),
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(5, 7), 0.33, 
1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class, 
withSettings().serializable());
+    when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
+        .thenReturn(readSession);
+    when(fakeStorageClient.readRows(expectedReadRowsRequestOne, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponsesOne));
+    when(fakeStorageClient.readRows(expectedReadRowsRequestTwo, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponsesTwo));
+
+    PCollection<Row> output =
+        p.apply(
+                BigQueryIO.readTableRowsWithSchema()
+                    .from("foo.com:project:dataset.table")
+                    .withMethod(Method.DIRECT_READ)
+                    .withSelectedFields(Lists.newArrayList("name"))
+                    .withFormat(DataFormat.AVRO)
+                    .withTestServices(
+                        new FakeBigQueryServices()
+                            .withDatasetService(fakeDatasetService)
+                            .withStorageClient(fakeStorageClient)))
+            .apply(Convert.toRows());
+
+    org.apache.beam.sdk.schemas.Schema beamSchema =
+        org.apache.beam.sdk.schemas.Schema.of(
+            org.apache.beam.sdk.schemas.Schema.Field.of(
+                "name", org.apache.beam.sdk.schemas.Schema.FieldType.STRING));
+    PAssert.that(output)
+        .containsInAnyOrder(
+            ImmutableList.of(
+                Row.withSchema(beamSchema).addValue("A").build(),
+                Row.withSchema(beamSchema).addValue("B").build(),
+                Row.withSchema(beamSchema).addValue("C").build(),
+                Row.withSchema(beamSchema).addValue("D").build(),
+                Row.withSchema(beamSchema).addValue("E").build(),
+                Row.withSchema(beamSchema).addValue("F").build(),
+                Row.withSchema(beamSchema).addValue("G").build()));
+
+    p.run();
+  }
+
+  @Test
+  public void testReadFromBigQueryIOArrow() throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", 
null);
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new 
Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedCreateReadSessionRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    
.setTable("projects/foo.com:project/datasets/dataset/tables/table")
+                    .setDataFormat(DataFormat.ARROW))
+            .setMaxStreamCount(0)
+            .build();
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSessionName")
+            .setArrowSchema(
+                ArrowSchema.newBuilder()
+                    .setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA))
+                    .build())
+            .addStreams(ReadStream.newBuilder().setName("streamName1"))
+            .addStreams(ReadStream.newBuilder().setName("streamName2"))
+            .setDataFormat(DataFormat.ARROW)
+            .build();
+
+    ReadRowsRequest expectedReadRowsRequestOne =
+        ReadRowsRequest.newBuilder().setReadStream("streamName1").build();
+    ReadRowsRequest expectedReadRowsRequestTwo =
+        ReadRowsRequest.newBuilder().setReadStream("streamName2").build();
+
+    List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F", "G");
+    List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L);
+    List<ReadRowsResponse> readRowsResponsesOne =
+        Lists.newArrayList(
+            createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), 
values.subList(0, 2), 0.0, 0.50),
+            createResponseArrow(
+                ARROW_SCHEMA, names.subList(2, 4), values.subList(2, 4), 0.5, 
0.75));
+    List<ReadRowsResponse> readRowsResponsesTwo =
+        Lists.newArrayList(
+            createResponseArrow(ARROW_SCHEMA, names.subList(4, 5), 
values.subList(4, 5), 0.0, 0.33),
+            createResponseArrow(
+                ARROW_SCHEMA, names.subList(5, 6), values.subList(5, 6), 0.33, 
0.66),
+            createResponseArrow(
+                ARROW_SCHEMA, names.subList(6, 7), values.subList(6, 7), 0.66, 
1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class, 
withSettings().serializable());
+    when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
+        .thenReturn(readSession);
+    when(fakeStorageClient.readRows(expectedReadRowsRequestOne, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponsesOne));
+    when(fakeStorageClient.readRows(expectedReadRowsRequestTwo, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponsesTwo));
+
+    PCollection<KV<String, Long>> output =
+        p.apply(
+            BigQueryIO.read(new ParseKeyValue())
+                .from("foo.com:project:dataset.table")
+                .withMethod(Method.DIRECT_READ)
+                .withFormat(DataFormat.ARROW)
+                .withTestServices(
+                    new FakeBigQueryServices()
+                        .withDatasetService(fakeDatasetService)
+                        .withStorageClient(fakeStorageClient)));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            ImmutableList.of(
+                KV.of("A", 1L),
+                KV.of("B", 2L),
+                KV.of("C", 3L),
+                KV.of("D", 4L),
+                KV.of("E", 5L),
+                KV.of("F", 6L),
+                KV.of("G", 7L)));
+
+    p.run();
+  }
+
+  @Test
+  public void testReadFromStreamSourceArrow() throws Exception {
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSession")
+            .setArrowSchema(
+                ArrowSchema.newBuilder()
+                    .setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA))
+                    .build())
+            .setDataFormat(DataFormat.ARROW)
+            .build();
+
+    ReadRowsRequest expectedRequest =
+        ReadRowsRequest.newBuilder().setReadStream("readStream").build();
+
+    List<String> names = Arrays.asList("A", "B", "C");
+    List<Long> values = Arrays.asList(1L, 2L, 3L);
+    List<ReadRowsResponse> responses =
+        Lists.newArrayList(
+            createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), 
values.subList(0, 2), 0.0, 0.50),
+            createResponseArrow(
+                ARROW_SCHEMA, names.subList(2, 3), values.subList(2, 3), 0.5, 
0.75));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(expectedRequest, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses));
+
+    List<ReadStream> streamBundle =
+        
Lists.newArrayList(ReadStream.newBuilder().setName("readStream").build());
+    BigQueryStorageStreamBundleSource<TableRow> streamSource =
+        BigQueryStorageStreamBundleSource.create(
+            readSession,
+            streamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient),
+            1L);
+
+    List<TableRow> rows = new ArrayList<>();
+    BoundedReader<TableRow> reader = streamSource.createReader(options);
+    for (boolean hasNext = reader.start(); hasNext; hasNext = 
reader.advance()) {
+      rows.add(reader.getCurrent());
+    }
+
+    System.out.println("Rows: " + rows);
+
+    assertEquals(3, rows.size());
+  }
+
+  @Test
+  public void testFractionConsumedWithArrowAndOneStreamInBundle() throws 
Exception {
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSession")
+            .setArrowSchema(
+                ArrowSchema.newBuilder()
+                    .setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA))
+                    .build())
+            .setDataFormat(DataFormat.ARROW)
+            .build();
+
+    ReadRowsRequest expectedRequest =
+        ReadRowsRequest.newBuilder().setReadStream("readStream").build();
+
+    List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F", "G");
+    List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L);
+    List<ReadRowsResponse> responses =
+        Lists.newArrayList(
+            createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), 
values.subList(0, 2), 0.0, 0.25),
+            createResponseArrow(
+                ARROW_SCHEMA, Lists.newArrayList(), Lists.newArrayList(), 
0.25, 0.25),
+            createResponseArrow(ARROW_SCHEMA, names.subList(2, 4), 
values.subList(2, 4), 0.3, 0.5),
+            createResponseArrow(ARROW_SCHEMA, names.subList(4, 7), 
values.subList(4, 7), 0.7, 1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(expectedRequest, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses));
+
+    List<ReadStream> streamBundle =
+        
Lists.newArrayList(ReadStream.newBuilder().setName("readStream").build());
+    BigQueryStorageStreamBundleSource<TableRow> streamSource =
+        BigQueryStorageStreamBundleSource.create(
+            readSession,
+            streamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient),
+            1L);
+
+    BoundedReader<TableRow> reader = streamSource.createReader(options);
+
+    // Before call to BoundedReader#start, fraction consumed must be zero.
+    assertEquals(0.0, reader.getFractionConsumed(), DELTA);
+
+    assertTrue(reader.start()); // Reads A.
+    assertEquals(0.125, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads B.
+    assertEquals(0.25, reader.getFractionConsumed(), DELTA);
+
+    assertTrue(reader.advance()); // Reads C.
+    assertEquals(0.4, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads D.
+    assertEquals(0.5, reader.getFractionConsumed(), DELTA);
+
+    assertTrue(reader.advance()); // Reads E.
+    assertEquals(0.8, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads F.
+    assertEquals(0.9, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads G.
+    assertEquals(1.0, reader.getFractionConsumed(), DELTA);
+
+    assertFalse(reader.advance()); // Reaches the end.
+
+    // We are done with the stream, so we should report 100% consumption.
+    assertEquals(Double.valueOf(1.0), reader.getFractionConsumed());
+  }
+
+  @Test
+  public void testFractionConsumedWithArrowAndMultipleStreamsInBundle() throws 
Exception {
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSession")
+            .setArrowSchema(
+                ArrowSchema.newBuilder()
+                    .setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA))
+                    .build())
+            .setDataFormat(DataFormat.ARROW)
+            .build();
+
+    ReadRowsRequest expectedRequestOne =
+        ReadRowsRequest.newBuilder().setReadStream("readStream1").build();
+    ReadRowsRequest expectedRequestTwo =
+        ReadRowsRequest.newBuilder().setReadStream("readStream2").build();
+
+    List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F", "G");
+    List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L);
+    List<ReadRowsResponse> responsesOne =
+        Lists.newArrayList(
+            createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), 
values.subList(0, 2), 0.0, 0.5),
+            createResponseArrow(ARROW_SCHEMA, Lists.newArrayList(), 
Lists.newArrayList(), 0.5, 0.5),
+            createResponseArrow(ARROW_SCHEMA, names.subList(2, 4), 
values.subList(2, 4), 0.5, 1.0));
+
+    List<ReadRowsResponse> responsesTwo =
+        Lists.newArrayList(
+            createResponseArrow(ARROW_SCHEMA, names.subList(4, 7), 
values.subList(4, 7), 0.0, 1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(expectedRequestOne, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(responsesOne));
+    when(fakeStorageClient.readRows(expectedRequestTwo, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(responsesTwo));
+
+    List<ReadStream> streamBundle =
+        Lists.newArrayList(
+            ReadStream.newBuilder().setName("readStream1").build(),
+            ReadStream.newBuilder().setName("readStream2").build());
+
+    BigQueryStorageStreamBundleSource<TableRow> streamSource =
+        BigQueryStorageStreamBundleSource.create(
+            readSession,
+            streamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient),
+            1L);
+
+    BoundedReader<TableRow> reader = streamSource.createReader(options);
+
+    // Before call to BoundedReader#start, fraction consumed must be zero.
+    assertEquals(0.0, reader.getFractionConsumed(), DELTA);
+
+    assertTrue(reader.start()); // Reads A.
+    assertEquals(0.125, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads B.
+    assertEquals(0.25, reader.getFractionConsumed(), DELTA);
+
+    assertTrue(reader.advance()); // Reads C.
+    assertEquals(0.375, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads D.
+    assertEquals(0.5, reader.getFractionConsumed(), DELTA);
+
+    assertTrue(reader.advance()); // Reads E.
+    assertEquals(0.6666666666666666, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads F.
+    assertEquals(0.8333333333333333, reader.getFractionConsumed(), DELTA);
+    assertTrue(reader.advance()); // Reads G.
+    assertEquals(1.0, reader.getFractionConsumed(), DELTA);
+
+    assertFalse(reader.advance()); // Reaches the end.
+
+    // We are done with the streams, so we should report 100% consumption.
+    assertEquals(Double.valueOf(1.0), reader.getFractionConsumed());
+  }
+
+  @Test
+  public void 
testStreamSourceSplitAtFractionWithArrowAndMultipleStreamsInBundle()
+      throws Exception {
+    List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F", "G");
+    List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L);
+    List<ReadRowsResponse> responses =
+        Lists.newArrayList(
+            createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), 
values.subList(0, 2), 0.0, 0.6),
+            createResponseArrow(ARROW_SCHEMA, names.subList(2, 3), 
values.subList(2, 3), 0.6, 1.0),
+            createResponseArrow(ARROW_SCHEMA, names.subList(3, 6), 
values.subList(3, 6), 0.0, 1.0),
+            createResponseArrow(ARROW_SCHEMA, names.subList(6, 7), 
values.subList(6, 7), 0.0, 1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream1").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(0, 2)));
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream2").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(2, 3)));
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream3").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(3, 4)));
+
+    List<ReadStream> primaryStreamBundle =
+        Lists.newArrayList(
+            ReadStream.newBuilder().setName("readStream1").build(),
+            ReadStream.newBuilder().setName("readStream2").build(),
+            ReadStream.newBuilder().setName("readStream3").build());
+
+    BigQueryStorageStreamBundleSource<TableRow> primarySource =
+        BigQueryStorageStreamBundleSource.create(
+            ReadSession.newBuilder()
+                .setName("readSession")
+                .setArrowSchema(
+                    ArrowSchema.newBuilder()
+                        
.setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA))
+                        .build())
+                .setDataFormat(DataFormat.ARROW)
+                .build(),
+            primaryStreamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient),
+            1L);
+
+    // Read a few records from the primary bundle and ensure that records are 
returned in the
+    // prescribed order.
+    BoundedReader<TableRow> primary = primarySource.createReader(options);
+    assertTrue(primary.start());
+    assertEquals("A", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("B", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+
+    // Now split the StreamBundle, and ensure that the returned source points 
to a non-null
+    // secondary StreamBundle.
+    BoundedSource<TableRow> secondarySource = primary.splitAtFraction(0.35);
+    assertNotNull(secondarySource);
+    BoundedReader<TableRow> secondary = secondarySource.createReader(options);
+
+    assertEquals("C", primary.getCurrent().get("name"));
+    assertFalse(primary.advance());
+
+    assertTrue(secondary.start());
+    assertEquals("D", secondary.getCurrent().get("name"));
+    assertTrue(secondary.advance());
+    assertEquals("E", secondary.getCurrent().get("name"));
+    assertTrue(secondary.advance());
+    assertEquals("F", secondary.getCurrent().get("name"));
+    assertTrue((secondary.advance()));
+    assertEquals("G", secondary.getCurrent().get("name"));
+    assertFalse((secondary.advance()));
+  }
+
+  @Test
+  public void 
testStreamSourceSplitAtFractionRepeatedWithArrowAndMultipleStreamsInBundle()
+      throws Exception {
+    List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F", "G");
+    List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L);
+    List<ReadRowsResponse> responses =
+        Lists.newArrayList(
+            createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), 
values.subList(0, 2), 0.0, 0.6),
+            createResponseArrow(ARROW_SCHEMA, names.subList(2, 3), 
values.subList(2, 3), 0.6, 1.0),
+            createResponseArrow(ARROW_SCHEMA, names.subList(3, 6), 
values.subList(3, 6), 0.0, 1.0),
+            createResponseArrow(ARROW_SCHEMA, names.subList(6, 7), 
values.subList(6, 7), 0.0, 1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream1").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(0, 2)));
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream2").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(2, 3)));
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream3").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(3, 4)));
+
+    List<ReadStream> primaryStreamBundle =
+        Lists.newArrayList(
+            ReadStream.newBuilder().setName("readStream1").build(),
+            ReadStream.newBuilder().setName("readStream2").build(),
+            ReadStream.newBuilder().setName("readStream3").build());
+
+    BigQueryStorageStreamBundleSource<TableRow> primarySource =
+        BigQueryStorageStreamBundleSource.create(
+            ReadSession.newBuilder()
+                .setName("readSession")
+                .setArrowSchema(
+                    ArrowSchema.newBuilder()
+                        
.setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA))
+                        .build())
+                .setDataFormat(DataFormat.ARROW)
+                .build(),
+            primaryStreamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient),
+            1L);
+
+    // Read a few records from the primary bundle and ensure that records are 
returned in the
+    // prescribed order.
+    BoundedReader<TableRow> primary = primarySource.createReader(options);
+    assertTrue(primary.start());
+    assertEquals("A", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("B", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+
+    // Now split the StreamBundle, and ensure that the returned source points 
to a non-null
+    // secondary StreamBundle. Since there are 3 streams in this Bundle, 
splitting will only
+    // occur when fraction >= 0.33.
+    BoundedSource<TableRow> secondarySource = primary.splitAtFraction(0.35);
+    assertNotNull(secondarySource);
+    BoundedReader<TableRow> secondary = secondarySource.createReader(options);
+
+    assertEquals("C", primary.getCurrent().get("name"));
+    assertFalse(primary.advance());
+
+    assertTrue(secondary.start());
+    assertEquals("D", secondary.getCurrent().get("name"));
+    assertTrue(secondary.advance());
+    assertEquals("E", secondary.getCurrent().get("name"));
+    assertTrue(secondary.advance());
+
+    // Now split the StreamBundle again, and ensure that the returned source 
points to a non-null
+    // tertiary StreamBundle. Since there are 2 streams in this Bundle, 
splitting will only
+    // occur when fraction >= 0.5.
+    BoundedSource<TableRow> tertiarySource = secondary.splitAtFraction(0.5);
+    assertNotNull(tertiarySource);
+    BoundedReader<TableRow> tertiary = tertiarySource.createReader(options);
+
+    assertEquals("F", secondary.getCurrent().get("name"));
+    assertFalse((secondary.advance()));
+
+    assertTrue(tertiary.start());
+    assertEquals("G", tertiary.getCurrent().get("name"));
+    assertFalse((tertiary.advance()));
+  }
+
+  @Test
+  public void 
testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPointArrow()
+      throws Exception {
+    List<String> names = Arrays.asList("A", "B", "C", "D", "E");
+    List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L);
+    List<ReadRowsResponse> responses =
+        Lists.newArrayList(
+            createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), 
values.subList(0, 2), 0.0, 0.66),
+            createResponseArrow(ARROW_SCHEMA, names.subList(2, 3), 
values.subList(2, 3), 0.66, 1.0),
+            createResponseArrow(ARROW_SCHEMA, names.subList(3, 5), 
values.subList(3, 5), 0.0, 1.0));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class);
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream1").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(0, 2)));
+    when(fakeStorageClient.readRows(
+            ReadRowsRequest.newBuilder().setReadStream("readStream2").build(), 
""))
+        .thenReturn(new FakeBigQueryServerStream<>(responses.subList(2, 3)));
+
+    List<ReadStream> parentStreamBundle =
+        Lists.newArrayList(
+            ReadStream.newBuilder().setName("readStream1").build(),
+            ReadStream.newBuilder().setName("readStream2").build());
+
+    BigQueryStorageStreamBundleSource<TableRow> streamBundleSource =
+        BigQueryStorageStreamBundleSource.create(
+            ReadSession.newBuilder()
+                .setName("readSession")
+                .setArrowSchema(
+                    ArrowSchema.newBuilder()
+                        
.setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA))
+                        .build())
+                .setDataFormat(DataFormat.ARROW)
+                .build(),
+            parentStreamBundle,
+            TABLE_SCHEMA,
+            new TableRowParser(),
+            TableRowJsonCoder.of(),
+            new FakeBigQueryServices().withStorageClient(fakeStorageClient),
+            1L);
+
+    // Read a few records from the parent bundle and ensure the records are 
returned in
+    // the prescribed order.
+    BoundedReader<TableRow> primary = streamBundleSource.createReader(options);
+    assertTrue(primary.start());
+    assertEquals("A", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("B", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("C", primary.getCurrent().get("name"));
+    assertTrue(primary.advance());
+    assertEquals("D", primary.getCurrent().get("name"));
+
+    // We attempt to split the StreamBundle after starting to read the 
contents of the second
+    // stream.
+    BoundedSource<TableRow> secondarySource = primary.splitAtFraction(0.5);
+    assertNull(secondarySource);
+
+    assertTrue(primary.advance());
+    assertEquals("E", primary.getCurrent().get("name"));
+    assertFalse(primary.advance());
+  }
+
+  @Test
+  public void testActuateProjectionPushdown() {
+    org.apache.beam.sdk.schemas.Schema schema =
+        org.apache.beam.sdk.schemas.Schema.builder()
+            .addStringField("foo")
+            .addStringField("bar")
+            .build();
+    TypedRead<Row> read =
+        BigQueryIO.read(
+                record ->
+                    BigQueryUtils.toBeamRow(
+                        record.getRecord(), schema, 
ConversionOptions.builder().build()))
+            .withMethod(Method.DIRECT_READ)
+            .withCoder(SchemaCoder.of(schema));
+
+    assertTrue(read.supportsProjectionPushdown());
+    PTransform<PBegin, PCollection<Row>> pushdownT =
+        read.actuateProjectionPushdown(
+            ImmutableMap.of(new TupleTag<>("output"), 
FieldAccessDescriptor.withFieldNames("foo")));
+
+    TypedRead<Row> pushdownRead = (TypedRead<Row>) pushdownT;
+    assertEquals(Method.DIRECT_READ, pushdownRead.getMethod());
+    assertThat(pushdownRead.getSelectedFields().get(), 
Matchers.containsInAnyOrder("foo"));
+    assertTrue(pushdownRead.getProjectionPushdownApplied());
+  }
+
+  @Test
+  public void testReadFromQueryDoesNotSupportProjectionPushdown() {
+    org.apache.beam.sdk.schemas.Schema schema =
+        org.apache.beam.sdk.schemas.Schema.builder()
+            .addStringField("foo")
+            .addStringField("bar")
+            .build();
+    TypedRead<Row> read =
+        BigQueryIO.read(
+                record ->
+                    BigQueryUtils.toBeamRow(
+                        record.getRecord(), schema, 
ConversionOptions.builder().build()))
+            .fromQuery("SELECT bar FROM `dataset.table`")
+            .withMethod(Method.DIRECT_READ)
+            .withCoder(SchemaCoder.of(schema));
+
+    assertFalse(read.supportsProjectionPushdown());
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            read.actuateProjectionPushdown(
+                ImmutableMap.of(
+                    new TupleTag<>("output"), 
FieldAccessDescriptor.withFieldNames("foo"))));
+  }
+
+  private static org.apache.arrow.vector.types.pojo.Field field(
+      String name,
+      boolean nullable,
+      ArrowType type,
+      org.apache.arrow.vector.types.pojo.Field... children) {
+    return new org.apache.arrow.vector.types.pojo.Field(
+        name,
+        new org.apache.arrow.vector.types.pojo.FieldType(nullable, type, null, 
null),
+        asList(children));
+  }
+
+  static org.apache.arrow.vector.types.pojo.Field field(
+      String name, ArrowType type, org.apache.arrow.vector.types.pojo.Field... 
children) {
+    return field(name, false, type, children);
+  }
+}


Reply via email to