[ 
https://issues.apache.org/jira/browse/BEAM-6841?focusedWorklogId=219830&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-219830
 ]

ASF GitHub Bot logged work on BEAM-6841:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Mar/19 05:56
            Start Date: 28/Mar/19 05:56
    Worklog Time Spent: 10m 
      Work Description: kmjung commented on pull request #8061: [BEAM-6841] Add 
support for reading query results using the BigQuery storage API.
URL: https://github.com/apache/beam/pull/8061#discussion_r269865391
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 ##########
 @@ -914,6 +933,181 @@ void cleanup(PassThroughThenCleanup.ContextContainer c) 
throws Exception {
       return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, 
jobIdTokenView));
     }
 
+    private PCollection<T> expandForDirectRead(PBegin input, Coder<T> 
outputCoder) {
+      ValueProvider<TableReference> tableProvider = getTableProvider();
+      Pipeline p = input.getPipeline();
+      if (tableProvider != null) {
+        // No job ID is required. Read directly from BigQuery storage.
+        return p.apply(
+            org.apache.beam.sdk.io.Read.from(
+                BigQueryStorageTableSource.create(
+                    tableProvider,
+                    getReadOptions(),
+                    getParseFn(),
+                    outputCoder,
+                    getBigQueryServices())));
+      }
+
+      checkArgument(
+          getReadOptions() == null,
+          "Invalid BigQueryIO.Read: Specifies table read options, "
+              + "which only applies when reading from a table");
+
+      //
+      // N.B. All of the code below exists because the BigQuery storage API 
can't (yet) read from
+      // all anonymous tables, so we need the job ID to reason about the name 
of the destination
+      // table for the query to read the data and subsequently delete the 
table and dataset. Once
+      // the storage API can handle anonymous tables, the storage source 
should be modified to use
+      // anonymous tables and all of the code related to job ID generation and 
table and dataset
+      // cleanup can be removed.
+      //
+
+      PCollectionView<String> jobIdTokenView;
+      PCollection<T> rows;
+
+      if (!getWithTemplateCompatibility()) {
+        // Create a singleton job ID token at pipeline construction time.
+        String staticJobUuid = BigQueryHelpers.randomUUIDString();
+        jobIdTokenView =
+            p.apply("TriggerIdCreation", Create.of(staticJobUuid))
+                .apply("ViewId", View.asSingleton());
+        // Apply the traditional Source model.
+        rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    createStorageQuerySource(staticJobUuid, outputCoder)));
+      } else {
+        // Create a singleton job ID token at pipeline execution time.
+        PCollection<String> jobIdTokenCollection =
+            p.apply("TriggerIdCreation", Create.of("ignored"))
+                .apply(
+                    "CreateJobId",
+                    MapElements.via(
+                        new SimpleFunction<String, String>() {
+                          @Override
+                          public String apply(String input) {
+                            return BigQueryHelpers.randomUUIDString();
+                          }
+                        }));
+
+        jobIdTokenView = jobIdTokenCollection.apply("ViewId", 
View.asSingleton());
+
+        TupleTag<Stream> streamsTag = new TupleTag<>();
+        TupleTag<ReadSession> readSessionTag = new TupleTag<>();
+        TupleTag<String> tableSchemaTag = new TupleTag<>();
+
+        PCollectionTuple tuple =
+            jobIdTokenCollection.apply(
+                "RunQueryJob",
+                ParDo.of(
+                        new DoFn<String, Stream>() {
+                          @ProcessElement
+                          public void processElement(ProcessContext c) throws 
Exception {
+                            BigQueryOptions options =
+                                
c.getPipelineOptions().as(BigQueryOptions.class);
+                            String jobUuid = c.element();
+                            // Execute the query and get the destination table 
holding the results.
+                            BigQueryStorageQuerySource<T> querySource =
+                                createStorageQuerySource(jobUuid, outputCoder);
+                            Table queryResultTable = 
querySource.getTargetTable(options);
+
+                            // Create a read session without specifying a 
desired stream count and
+                            // let the BigQuery storage server pick the number 
of streams.
+                            CreateReadSessionRequest request =
+                                CreateReadSessionRequest.newBuilder()
+                                    .setParent("projects/" + 
options.getProject())
+                                    .setTableReference(
+                                        BigQueryHelpers.toTableRefProto(
+                                            
queryResultTable.getTableReference()))
+                                    .setRequestedStreams(0)
+                                    .build();
+
+                            ReadSession readSession;
+                            try (StorageClient storageClient =
+                                
getBigQueryServices().getStorageClient(options)) {
+                              readSession = 
storageClient.createReadSession(request);
+                            }
+
+                            for (Stream stream : readSession.getStreamsList()) 
{
+                              c.output(stream);
+                            }
+
+                            c.output(readSessionTag, readSession);
+                            c.output(
+                                tableSchemaTag,
+                                
BigQueryHelpers.toJsonString(queryResultTable.getSchema()));
+                          }
+                        })
+                    .withOutputTags(
+                        streamsTag, 
TupleTagList.of(readSessionTag).and(tableSchemaTag)));
+
+        tuple.get(streamsTag).setCoder(ProtoCoder.of(Stream.class));
+        tuple.get(readSessionTag).setCoder(ProtoCoder.of(ReadSession.class));
+        tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of());
+
+        PCollectionView<ReadSession> readSessionView =
+            tuple.get(readSessionTag).apply("ReadSessionView", 
View.asSingleton());
+        PCollectionView<String> tableSchemaView =
+            tuple.get(tableSchemaTag).apply("TableSchemaView", 
View.asSingleton());
+
+        rows =
+            tuple
+                .get(streamsTag)
+                .apply(Reshuffle.viaRandomKey())
+                .apply(
+                    ParDo.of(
+                            new DoFn<Stream, T>() {
+                              @ProcessElement
+                              public void processElement(ProcessContext c) 
throws Exception {
+                                ReadSession readSession = 
c.sideInput(readSessionView);
+                                TableSchema tableSchema =
+                                    BigQueryHelpers.fromJsonString(
+                                        c.sideInput(tableSchemaView), 
TableSchema.class);
+                                Stream stream = c.element();
+
+                                BigQueryStorageStreamSource<T> streamSource =
+                                    BigQueryStorageStreamSource.create(
+                                        readSession,
+                                        stream,
+                                        tableSchema,
+                                        getParseFn(),
+                                        outputCoder,
+                                        getBigQueryServices());
+
+                                BoundedSource.BoundedReader<T> reader =
+                                    
streamSource.createReader(c.getPipelineOptions());
+                                for (boolean more = reader.start(); more; more 
= reader.advance()) {
+                                  c.output(reader.getCurrent());
+                                }
+                              }
+                            })
+                        .withSideInputs(readSessionView, tableSchemaView))
+                .setCoder(outputCoder);
+      }
+
+      PassThroughThenCleanup.CleanupOperation cleanupOperation =
+          new CleanupOperation() {
+            @Override
+            void cleanup(ContextContainer c) throws Exception {
+              BigQueryOptions options = 
c.getPipelineOptions().as(BigQueryOptions.class);
+              String jobUuid = c.getJobId();
+
+              TableReference tempTable =
+                  createTempTableReference(
+                      options.getProject(), 
createJobIdToken(options.getJobName(), jobUuid));
+
+              DatasetService datasetService = 
getBigQueryServices().getDatasetService(options);
+              LOG.info("Deleting temporary table with query results {}", 
tempTable);
+              datasetService.deleteTable(tempTable);
+              LOG.info(
+                  "Deleting temporary dataset with query results {}", 
tempTable.getDatasetId());
+              datasetService.deleteDataset(tempTable.getProjectId(), 
tempTable.getDatasetId());
+            }
+          };
+
+      return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, 
jobIdTokenView));
 
 Review comment:
   Hmm, we don't do this in the existing code path -- the 
passThroughThenCleanup transform is applied directly to the output rows 
collection. Does that also need fixing up?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 219830)
    Time Spent: 3h 50m  (was: 3h 40m)

> Support reading query results with the BigQuery storage API
> -----------------------------------------------------------
>
>                 Key: BEAM-6841
>                 URL: https://issues.apache.org/jira/browse/BEAM-6841
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp
>            Reporter: Kenneth Jung
>            Assignee: Kenneth Jung
>            Priority: Minor
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to