vachan-shetty commented on code in PR #25392: URL: https://github.com/apache/beam/pull/25392#discussion_r1102191526
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java: ########## @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.api.gax.rpc.ApiException; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; +import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.cloud.bigquery.storage.v1.ReadStream; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.beam.runners.core.metrics.ServiceCallMetric; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.OffsetBasedSource; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.RequiresNonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class BigQueryStorageStreamBundleSource<T> extends OffsetBasedSource<T> { + + public static <T> BigQueryStorageStreamBundleSource<T> create( + ReadSession readSession, + List<ReadStream> streamBundle, + TableSchema tableSchema, + SerializableFunction<SchemaAndRecord, T> parseFn, + Coder<T> outputCoder, + BigQueryServices bqServices, + long minBundleSize) { + return new BigQueryStorageStreamBundleSource<>( + readSession, + streamBundle, + toJsonString(Preconditions.checkArgumentNotNull(tableSchema, "tableSchema")), + parseFn, + outputCoder, + bqServices, + minBundleSize); + } + + /** + * Creates a new source with the same properties as this one, except with a different {@link + * List<ReadStream>}. + */ + public BigQueryStorageStreamBundleSource<T> fromExisting(List<ReadStream> newStreamBundle) { + return new BigQueryStorageStreamBundleSource<>( + readSession, + newStreamBundle, + jsonTableSchema, + parseFn, + outputCoder, + bqServices, + getMinBundleSize()); + } + + private final ReadSession readSession; + private final List<ReadStream> streamBundle; + private final String jsonTableSchema; + private final SerializableFunction<SchemaAndRecord, T> parseFn; + private final Coder<T> outputCoder; + private final BigQueryServices bqServices; + + private BigQueryStorageStreamBundleSource( + ReadSession readSession, + List<ReadStream> streamBundle, + String jsonTableSchema, + SerializableFunction<SchemaAndRecord, T> parseFn, + Coder<T> outputCoder, + BigQueryServices bqServices, + long minBundleSize) { + // The underlying OffsetBasedSource (and RangeTracker) operate only on the StreamBundle and NOT + // the Streams that constitute the StreamBundle. More specifically, the offsets in the + // OffsetBasedSource are indices for the StreamBundle List. + super(0, streamBundle.size(), minBundleSize); + this.readSession = Preconditions.checkArgumentNotNull(readSession, "readSession"); + this.streamBundle = Preconditions.checkArgumentNotNull(streamBundle, "streams"); + this.jsonTableSchema = Preconditions.checkArgumentNotNull(jsonTableSchema, "jsonTableSchema"); + this.parseFn = Preconditions.checkArgumentNotNull(parseFn, "parseFn"); + this.outputCoder = Preconditions.checkArgumentNotNull(outputCoder, "outputCoder"); + this.bqServices = Preconditions.checkArgumentNotNull(bqServices, "bqServices"); + } + + @Override + public Coder<T> getOutputCoder() { + return outputCoder; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add(DisplayData.item("table", readSession.getTable()).withLabel("Table")) + .add(DisplayData.item("readSession", readSession.getName()).withLabel("Read session")); + for (ReadStream readStream : streamBundle) { + builder.add(DisplayData.item("stream", readStream.getName()).withLabel("Stream")); + } + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) { + // The size of stream source can't be estimated due to server-side liquid sharding. + // TODO: Implement progress reporting. + return 0L; + } + + @Override + public List<? extends OffsetBasedSource<T>> split( + long desiredBundleSizeBytes, PipelineOptions options) { + // A stream source can't be split without reading from it due to server-side liquid sharding. + // TODO: Implement dynamic work rebalancing. + return ImmutableList.of(this); + } + + @Override + public long getMaxEndOffset(PipelineOptions options) throws Exception { + return this.streamBundle.size(); + } + + @Override + public OffsetBasedSource<T> createSourceForSubrange(long start, long end) { + List<ReadStream> newStreamBundle = streamBundle.subList((int) start, (int) end); + return fromExisting(newStreamBundle); + } + + @Override + public BigQueryStorageStreamBundleReader<T> createReader(PipelineOptions options) + throws IOException { + return new BigQueryStorageStreamBundleReader<>(this, options.as(BigQueryOptions.class)); + } + + public static class BigQueryStorageStreamBundleReader<T> extends OffsetBasedReader<T> { + private static final Logger LOG = + LoggerFactory.getLogger(BigQueryStorageStreamBundleReader.class); + + private final BigQueryStorageReader reader; + private final SerializableFunction<SchemaAndRecord, T> parseFn; + private final StorageClient storageClient; + private final TableSchema tableSchema; + + private BigQueryStorageStreamBundleSource<T> source; + private @Nullable BigQueryServerStream<ReadRowsResponse> responseStream = null; + private @Nullable Iterator<ReadRowsResponse> responseIterator = null; + private @Nullable T current = null; + private int currentStreamBundleIndex; + private long currentStreamOffset; + + // Values used for progress reporting. + private double fractionOfStreamBundleConsumed; + + private double progressAtResponseStart; + private double progressAtResponseEnd; + private long rowsConsumedFromCurrentResponse; + private long totalRowsInCurrentResponse; + + private @Nullable TableReference tableReference; + private @Nullable ServiceCallMetric serviceCallMetric; + + private BigQueryStorageStreamBundleReader( + BigQueryStorageStreamBundleSource<T> source, BigQueryOptions options) throws IOException { + super(source); + this.source = source; + this.reader = BigQueryStorageReaderFactory.getReader(source.readSession); + this.parseFn = source.parseFn; + this.storageClient = source.bqServices.getStorageClient(options); + this.tableSchema = fromJsonString(source.jsonTableSchema, TableSchema.class); + this.currentStreamBundleIndex = 0; + this.fractionOfStreamBundleConsumed = 0d; + this.progressAtResponseStart = 0d; + this.progressAtResponseEnd = 0d; + this.rowsConsumedFromCurrentResponse = 0L; + this.totalRowsInCurrentResponse = 0L; + } + + @Override + public T getCurrent() throws NoSuchElementException { + if (current == null) { + throw new NoSuchElementException(); + } + return current; + } + + @Override + protected long getCurrentOffset() throws NoSuchElementException { + return currentStreamBundleIndex; + } + + @Override + protected boolean isAtSplitPoint() throws NoSuchElementException { + // The start of every Stream within a StreamBundle is being defined as a split point. This + // implies that we cannot split below the granularity of a Stream + if (currentStreamOffset == 0) { + return true; + } + return false; + } + + @Override + public boolean startImpl() throws IOException { + return readNextStream(); + } + + @Override + public boolean advanceImpl() throws IOException { + Preconditions.checkStateNotNull(responseIterator); + currentStreamOffset += totalRowsInCurrentResponse; + return readNextRecord(); + } + + private synchronized boolean readNextStream() throws IOException { Review Comment: Removed the `synchronized` from `readNextStream()`. Also added a Javadoc explaining how this Source handles possible race conditions. In summary: the underlying `OffsetBasedSource` and `OffsetRangeTracker` only operate in the split-point space (which in this case are StreamBundle indices). As a result, the `RangeTracker` does NOT interact with the underlying Stream directly. This in turn allows us to rely on the synchronization guarantees provided by `RangeTracker` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
