vachan-shetty commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1102191267


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1585,6 +1592,196 @@ void cleanup(ContextContainer c) throws Exception {
       return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, 
jobIdTokenView));
     }
 
+    private PCollection<T> expandAnonForDirectReadWithStreamBundle(
+        PBegin input, Coder<T> outputCoder, Schema beamSchema) {
+
+      Pipeline p = input.getPipeline();
+      PCollectionView<String> jobIdTokenView;
+      PCollection<T> rows;
+
+      if (!getWithTemplateCompatibility()) {
+        // Create a singleton job ID token at pipeline construction time.
+        String staticJobUuid = BigQueryHelpers.randomUUIDString();
+        jobIdTokenView =
+            p.apply("TriggerIdCreation", Create.of(staticJobUuid))
+                .apply("ViewId", View.asSingleton());
+        // Apply the traditional Source model.
+        rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    createStorageQuerySource(staticJobUuid, outputCoder)));
+      } else {
+        // Create a singleton job ID token at pipeline execution time.
+        PCollection<String> jobIdTokenCollection =
+            p.apply("TriggerIdCreation", Create.of("ignored"))
+                .apply(
+                    "CreateJobId",
+                    MapElements.via(
+                        new SimpleFunction<String, String>() {
+                          @Override
+                          public String apply(String input) {
+                            return BigQueryHelpers.randomUUIDString();
+                          }
+                        }));
+
+        jobIdTokenView = jobIdTokenCollection.apply("ViewId", 
View.asSingleton());
+
+        TupleTag<List<ReadStream>> listReadStreamsTag = new TupleTag<>();
+        TupleTag<ReadSession> readSessionTag = new TupleTag<>();
+        TupleTag<String> tableSchemaTag = new TupleTag<>();
+
+        PCollectionTuple tuple =

Review Comment:
   Trying to address this in a follow-up commit for now. Due to the urgency of 
the customer issue I might also try to make clean-up changes in a follow-up PR.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -180,19 +205,34 @@ public List<BigQueryStorageStreamSource<T>> split(
       throw new IllegalArgumentException(
           "data is not in a supported dataFormat: " + 
readSession.getDataFormat());
     }
-
+    int streamIndex = 0;
     Preconditions.checkStateNotNull(
         targetTable); // TODO: this is inconsistent with method above, where 
it can be null
     TableSchema trimmedSchema =
         BigQueryAvroUtils.trimBigQueryTableSchema(targetTable.getSchema(), 
sessionSchema);
-    List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
-    for (ReadStream readStream : readSession.getStreamsList()) {
-      sources.add(
-          BigQueryStorageStreamSource.create(
-              readSession, readStream, trimmedSchema, parseFn, outputCoder, 
bqServices));
+    if (!bqOptions.getEnableBundling()) {
+      List<BigQueryStorageStreamSource<T>> sources = Lists.newArrayList();
+      for (ReadStream readStream : readSession.getStreamsList()) {
+        sources.add(
+            BigQueryStorageStreamSource.create(
+                readSession, readStream, trimmedSchema, parseFn, outputCoder, 
bqServices));
+      }
+      return ImmutableList.copyOf(sources);
+    } else {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to