vachan-shetty commented on code in PR #25392:
URL: https://github.com/apache/beam/pull/25392#discussion_r1109110017
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java:
##########
@@ -156,16 +161,36 @@ public List<BigQueryStorageStreamSource<T>> split(
try (StorageClient client = bqServices.getStorageClient(bqOptions)) {
readSession = client.createReadSession(createReadSessionRequest);
LOG.info(
- "Sent BigQuery Storage API CreateReadSession request '{}'; received
response '{}'.",
- createReadSessionRequest,
- readSession);
+ "Sent BigQuery Storage API CreateReadSession request in code '{}'.",
+ createReadSessionRequest);
+ LOG.info(
+ "Received number of streams in response: '{}'.",
readSession.getStreamsList().size());
}
if (readSession.getStreamsList().isEmpty()) {
// The underlying table is empty or all rows have been pruned.
return ImmutableList.of();
}
+ streamCount = readSession.getStreamsList().size();
+ int streamsPerBundle = 0;
+ double bytesPerStream = 0;
+ LOG.info(
+ "readSession.getEstimatedTotalBytesScanned(): '{}'",
+ readSession.getEstimatedTotalBytesScanned());
+ if (bqOptions.getEnableBundling()) {
+ if (desiredBundleSizeBytes > 0) {
+ bytesPerStream =
+ (double) readSession.getEstimatedTotalBytesScanned() /
readSession.getStreamsCount();
+ LOG.info("bytesPerStream: '{}'", bytesPerStream);
Review Comment:
Done.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1585,6 +1527,233 @@ void cleanup(ContextContainer c) throws Exception {
return rows.apply(new PassThroughThenCleanup<>(cleanupOperation,
jobIdTokenView));
}
+ private PCollectionTuple createTupleForDirectRead(
+ PCollection<String> jobIdTokenCollection,
+ Coder<T> outputCoder,
+ TupleTag<ReadStream> readStreamsTag,
+ TupleTag<ReadSession> readSessionTag,
+ TupleTag<String> tableSchemaTag) {
+ PCollectionTuple tuple =
+ jobIdTokenCollection.apply(
+ "RunQueryJob",
+ ParDo.of(
+ new DoFn<String, ReadStream>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws
Exception {
+ BigQueryOptions options =
+ c.getPipelineOptions().as(BigQueryOptions.class);
+ String jobUuid = c.element();
+ // Execute the query and get the destination table
holding the results.
+ // The getTargetTable call runs a new instance of
the query and returns
+ // the destination table created to hold the results.
+ BigQueryStorageQuerySource<T> querySource =
+ createStorageQuerySource(jobUuid, outputCoder);
+ Table queryResultTable =
querySource.getTargetTable(options);
+
+ // Create a read session without specifying a
desired stream count and
+ // let the BigQuery storage server pick the number
of streams.
+ CreateReadSessionRequest request =
+ CreateReadSessionRequest.newBuilder()
+ .setParent(
+ BigQueryHelpers.toProjectResourceName(
+ options.getBigQueryProject() == null
+ ? options.getProject()
+ : options.getBigQueryProject()))
+ .setReadSession(
+ ReadSession.newBuilder()
+ .setTable(
+
BigQueryHelpers.toTableResourceName(
+
queryResultTable.getTableReference()))
+ .setDataFormat(DataFormat.AVRO))
+ .setMaxStreamCount(0)
+ .build();
+
+ ReadSession readSession;
+ try (StorageClient storageClient =
+ getBigQueryServices().getStorageClient(options))
{
+ readSession =
storageClient.createReadSession(request);
+ }
+
+ for (ReadStream readStream :
readSession.getStreamsList()) {
+ c.output(readStream);
+ }
+
+ c.output(readSessionTag, readSession);
+ c.output(
+ tableSchemaTag,
+
BigQueryHelpers.toJsonString(queryResultTable.getSchema()));
+ }
+ })
+ .withOutputTags(
+ readStreamsTag,
TupleTagList.of(readSessionTag).and(tableSchemaTag)));
+
+ return tuple;
+ }
+
+ private PCollectionTuple createTupleForDirectReadWithStreamBundle(
+ PCollection<String> jobIdTokenCollection,
+ Coder<T> outputCoder,
+ TupleTag<List<ReadStream>> listReadStreamsTag,
+ TupleTag<ReadSession> readSessionTag,
+ TupleTag<String> tableSchemaTag) {
+
+ PCollectionTuple tuple =
+ jobIdTokenCollection.apply(
+ "RunQueryJob",
+ ParDo.of(
+ new DoFn<String, List<ReadStream>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws
Exception {
+ BigQueryOptions options =
+ c.getPipelineOptions().as(BigQueryOptions.class);
+ String jobUuid = c.element();
+ // Execute the query and get the destination table
holding the results.
+ // The getTargetTable call runs a new instance of
the query and returns
+ // the destination table created to hold the results.
+ BigQueryStorageQuerySource<T> querySource =
+ createStorageQuerySource(jobUuid, outputCoder);
+ Table queryResultTable =
querySource.getTargetTable(options);
+
+ // Create a read session without specifying a
desired stream count and
+ // let the BigQuery storage server pick the number
of streams.
+ CreateReadSessionRequest request =
+ CreateReadSessionRequest.newBuilder()
+ .setParent(
+ BigQueryHelpers.toProjectResourceName(
+ options.getBigQueryProject() == null
+ ? options.getProject()
+ : options.getBigQueryProject()))
+ .setReadSession(
+ ReadSession.newBuilder()
+ .setTable(
+
BigQueryHelpers.toTableResourceName(
+
queryResultTable.getTableReference()))
+ .setDataFormat(DataFormat.AVRO))
+ .setMaxStreamCount(0)
+ .build();
+
+ ReadSession readSession;
+ try (StorageClient storageClient =
+ getBigQueryServices().getStorageClient(options))
{
+ readSession =
storageClient.createReadSession(request);
+ }
+ int streamIndex = 0;
+ int streamsPerBundle = 10;
+ List<ReadStream> streamBundle = Lists.newArrayList();
+ for (ReadStream readStream :
readSession.getStreamsList()) {
+ streamIndex++;
+ streamBundle.add(readStream);
+ if (streamIndex % streamsPerBundle == 0) {
+ c.output(streamBundle);
+ streamBundle = Lists.newArrayList();
+ }
+ }
+
Review Comment:
Nice catch! Added your suggested change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]