RustedBones commented on code in PR #32360: URL: https://github.com/apache/beam/pull/32360#discussion_r1743277559
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java: ########## @@ -1265,35 +1413,62 @@ public PCollection<T> expand(PBegin input) { checkArgument(getUseLegacySql() != null, "useLegacySql should not be null if query is set"); } - checkArgument(getDatumReaderFactory() != null, "A readerDatumFactory is required"); + if (getMethod() != TypedRead.Method.DIRECT_READ) { + checkArgument( + getSelectedFields() == null, + "Invalid BigQueryIO.Read: Specifies selected fields, " + + "which only applies when using Method.DIRECT_READ"); + + checkArgument( + getRowRestriction() == null, + "Invalid BigQueryIO.Read: Specifies row restriction, " + + "which only applies when using Method.DIRECT_READ"); + } else if (getTableProvider() == null) { + checkArgument( + getSelectedFields() == null, + "Invalid BigQueryIO.Read: Specifies selected fields, " + + "which only applies when reading from a table"); + + checkArgument( + getRowRestriction() == null, + "Invalid BigQueryIO.Read: Specifies row restriction, " + + "which only applies when reading from a table"); + } - // if both toRowFn and fromRowFn values are set, enable Beam schema support Pipeline p = input.getPipeline(); BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class); final BigQuerySourceDef sourceDef = createSourceDef(); + // schema may need to be requested during graph creation to infer coder or beam schema + TableSchema tableSchema = null; + + // read table schema and infer coder if possible + Coder<T> c; + if (getCoder() == null) { + tableSchema = requestTableSchema(sourceDef, bqOptions, getSelectedFields()); Review Comment: Is it fine to access the BQ table at graph creation time? (It was already doing that when beam schema was requested) ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java: ########## @@ -42,15 +40,15 @@ interface BigQuerySourceDef extends Serializable { <T> BigQuerySourceBase<T> toSource( String stepUuid, Coder<T> coder, - SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory, + BigQueryReaderFactory<T> readerFactory, boolean useAvroLogicalTypes); /** - * Extract the Beam {@link Schema} corresponding to this source. + * Extract the {@link TableSchema} corresponding to this source. * * @param bqOptions BigQueryOptions - * @return Beam schema of the source + * @return table schema of the source * @throws BigQuerySchemaRetrievalException if schema retrieval fails */ - Schema getBeamSchema(BigQueryOptions bqOptions); + TableSchema getTableSchema(BigQueryOptions bqOptions); Review Comment: As this is in BQ realm, it makes more sense to return unaltered `TableSchema` ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java: ########## @@ -1731,7 +1870,7 @@ public void processElement(ProcessContext c) throws Exception { .setTable( BigQueryHelpers.toTableResourceName( queryResultTable.getTableReference())) - .setDataFormat(DataFormat.AVRO)) Review Comment: was arrow even supported ? ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryReaderFactory.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.extensions.arrow.ArrowConversion; +import org.apache.beam.sdk.extensions.avro.io.AvroSource; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.SerializableBiFunction; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.SerializableSupplier; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.Nullable; + +abstract class BigQueryReaderFactory<T> implements BigQueryStorageReaderFactory<T>, Serializable { + + // TODO make file source params generic (useAvroLogicalTypes) + abstract BoundedSource<T> getSource( + MatchResult.Metadata metadata, + TableSchema tableSchema, + Boolean useAvroLogicalTypes, Review Comment: any proposal here ? If there is a plan to support CSV export for instance, we'd have to pass the chosen `field_delimiter` ########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java: ########## Review Comment: May changes here: - generating avro schema from table schema has 2 options: - with logical types, as done in the BQ direct read, and BQ export with `use_logical_type` - without logical type, as done in BQ export. This conversion is destructive as many types fallback to `String` - converting `GenericRecord` to `TableRow` changed. It now expects the logical-type schema and thus can drop the need of the `TableSchema` for conversion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org