[
https://issues.apache.org/jira/browse/BEAM-14035?focusedWorklogId=755633&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755633
]
ASF GitHub Bot logged work on BEAM-14035:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Apr/22 08:08
Start Date: 12/Apr/22 08:08
Worklog Time Spent: 10m
Work Description: angoenka commented on code in PR #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r848042337
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {
Review Comment:
I think we can separate the configuration out to
`BigQuerySchemaTransformReadConfiguration` and
`BigQuerySchemaTransformWriteConfiguration`
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {
+ private static final AutoValueSchema AUTO_VALUE_SCHEMA = new
AutoValueSchema();
+
+ /**
+ * Instantiates a {@link Read.Builder} from the SQL query.
+ *
+ * <p>The configuration defaults to useStandardSql=true.
+ */
+ public static Read.Builder createQueryBuilder(String query) {
Review Comment:
I think we can get away with only 1 read builder. `createReadBuilder`
We also need to do validation at the time of build so that both `JobType` in
consistence with other fields (`useStandardSql` and `TableSpec`).
The returned builders are modifiable so there is no real value in
overloading the create method.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read
jobs configured using
+ * {@link BigQuerySchemaTransformConfiguration.Read}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+ extends
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Read> {
+
+ private static final String API = "bigquery";
+ private static final String VERSION = "v2";
Review Comment:
What does the version represent here?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read
jobs configured using
+ * {@link BigQuerySchemaTransformConfiguration.Read}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+ extends
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Read> {
+
+ private static final String API = "bigquery";
+ private static final String VERSION = "v2";
+ private static final String OUTPUT_TAG = "OUTPUT";
+
+ /** Returns the expected class of the configuration. */
+ @Override
+ protected Class<BigQuerySchemaTransformConfiguration.Read>
configurationClass() {
+ return BigQuerySchemaTransformConfiguration.Read.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Read
configuration) {
+ return new BigQueryReadSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
+ @Override
+ public String identifier() {
+ return String.format("%s:%s", API, VERSION);
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since
+ * no input is expected, this returns an empty list.
+ */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
+ * a single output is expected, this returns a list with a single name.
+ */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.singletonList(OUTPUT_TAG);
+ }
+
+ /**
+ * An implementation of {@link SchemaTransform} for BigQuery read jobs
configured using {@link
+ * BigQuerySchemaTransformConfiguration.Read}.
+ */
+ static class BigQueryReadSchemaTransform implements SchemaTransform {
+ private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+ BigQueryReadSchemaTransform(BigQuerySchemaTransformConfiguration.Read
configuration) {
+ this.configuration = configuration;
+ }
+
+ /** Implements {@link SchemaTransform} buildTransform method. */
+ @Override
+ public PTransform<PCollectionRowTuple, PCollectionRowTuple>
buildTransform() {
+ return new PCollectionRowTupleTransform(configuration);
+ }
+ }
+
+ /**
+ * An implementation of {@link PTransform} for BigQuery read jobs configured
using {@link
+ * BigQuerySchemaTransformConfiguration.Read}.
+ */
+ static class PCollectionRowTupleTransform
+ extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+ private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+ /** An instance of {@link BigQueryServices} used for testing. */
+ private BigQueryServices testBigQueryServices = null;
+
+ PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Read
configuration) {
+ this.configuration = configuration;
+ }
+
+ void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
Review Comment:
Mark this as @visibleForTesting
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery
write jobs configured
+ * using {@link BigQuerySchemaTransformConfiguration.Write}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+ extends
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Write> {
+
+ private static final String API = "bigquery";
+ private static final String VERSION = "v2";
+ static final String INPUT_TAG = "INPUT";
+
+ /** Returns the expected class of the configuration. */
+ @Override
+ protected Class<BigQuerySchemaTransformConfiguration.Write>
configurationClass() {
+ return BigQuerySchemaTransformConfiguration.Write.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Write
configuration) {
+ return new BigQueryWriteSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
+ @Override
+ public String identifier() {
+ return String.format("%s:%s", API, VERSION);
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since a
+ * single is expected, this returns a list with a single name.
+ */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.singletonList(INPUT_TAG);
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
+ * no output is expected, this returns an empty list.
+ */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based
on a {@link
+ * BigQuerySchemaTransformConfiguration.Write}.
+ */
+ static class BigQueryWriteSchemaTransform implements SchemaTransform {
+ private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+ BigQueryWriteSchemaTransform(BigQuerySchemaTransformConfiguration.Write
configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PTransform<PCollectionRowTuple, PCollectionRowTuple>
buildTransform() {
+ return new PCollectionRowTupleTransform(configuration);
+ }
+ }
+
+ /**
+ * An implementation of {@link PTransform} for BigQuery write jobs
configured using {@link
+ * BigQuerySchemaTransformConfiguration.Write}.
+ */
+ static class PCollectionRowTupleTransform
+ extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+ private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+ /** An instance of {@link BigQueryServices} used for testing. */
+ private BigQueryServices testBigQueryServices = null;
+
+ PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Write
configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ CreateDisposition createDisposition =
configuration.getCreateDispositionEnum();
+ Schema destinationSchema = getDestinationRowSchema(options);
+
+ if (destinationSchema == null) {
+ // We only care if the create disposition implies an existing table
i.e. create never.
+ if (createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
+ throw new InvalidConfigurationException(
+ String.format(
+ "configuration create disposition: %s for table: %s for a
null destination schema",
+ createDisposition, configuration.getTableSpec()));
+ }
+ }
+ }
Review Comment:
```suggestion
if (destinationSchema == null &&
createDisposition.equals(CreateDisposition.CREATE_NEVER) {
// We only care if the create disposition implies an existing table
i.e. create never.
throw new InvalidConfigurationException(
String.format(
"configuration create disposition: %s for table: %s for a
null destination schema",
createDisposition, configuration.getTableSpec()));
}
}
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {
+ private static final AutoValueSchema AUTO_VALUE_SCHEMA = new
AutoValueSchema();
+
+ /**
+ * Instantiates a {@link Read.Builder} from the SQL query.
+ *
+ * <p>The configuration defaults to useStandardSql=true.
+ */
+ public static Read.Builder createQueryBuilder(String query) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+ .setJobType(JobType.QUERY)
+ .setUseStandardSql(Read.DEFAULT_USE_STANDARD_SQL)
+ .setQuery(query);
+ }
+
+ /**
+ * Instantiates a {@link Read.Builder} to support BigQuery extract jobs. See
{@link
+ * BigQueryIO.TypedRead#from(String)} for the expected format.
+ */
+ public static Read.Builder createExtractBuilder(String tableSpec) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+ .setJobType(JobType.EXTRACT)
+ .setTableSpec(tableSpec);
+ }
+
+ /** Instantiates a {@link Read.Builder} to support BigQuery extract jobs. */
+ public static Read.Builder createExtractBuilder(TableReference
tableReference) {
+ return createExtractBuilder(BigQueryHelpers.toTableSpec(tableReference));
+ }
+
+ /**
+ * Instantiates a {@link Write.Builder} to support BigQuery load jobs. See
{@link
+ * BigQueryIO.Write#to(String)}} for toTableSpec expected format.
+ */
+ public static Write.Builder createLoadBuilder(
+ String tableSpec, CreateDisposition createDisposition, WriteDisposition
writeDisposition) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Write.Builder()
+ .setTableSpec(tableSpec)
+ .setCreateDisposition(createDisposition.name())
+ .setWriteDisposition(writeDisposition.name());
+ }
+
+ /** Instantiates a {@link Write.Builder} to support BigQuery load jobs. */
+ public static Write.Builder createLoadBuilder(
+ TableReference toTable,
+ CreateDisposition createDisposition,
+ WriteDisposition writeDisposition) {
+ return createLoadBuilder(
+ BigQueryHelpers.toTableSpec(toTable), createDisposition,
writeDisposition);
+ }
+
+ /**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link
BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the
+ * Beam repository.
+ */
+ @SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+ })
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class Read {
+ private static final TypeDescriptor<Read> TYPE_DESCRIPTOR =
TypeDescriptor.of(Read.class);
+ private static final SerializableFunction<Read, Row>
ROW_SERIALIZABLE_FUNCTION =
+ AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+ private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+ /** Configures the BigQuery job type. */
+ abstract JobType getJobType();
+
+ /** Serializes configuration to a {@link Row}. */
+ Row toBeamRow() {
+ return ROW_SERIALIZABLE_FUNCTION.apply(this);
+ }
+
+ /** Configures the BigQuery read job with the SQL query. */
+ @Nullable
+ public abstract String getQuery();
+
+ /**
+ * Specifies a table for a BigQuery read job. See {@link
BigQueryIO.TypedRead#from(String)} for
+ * more details on the expected format.
+ */
+ @Nullable
+ public abstract String getTableSpec();
+
+ /** BigQuery geographic location where the query job will be executed. */
+ @Nullable
+ public abstract String getQueryLocation();
+
+ /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+ @Nullable
+ public abstract Boolean getUseStandardSql();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ /** Configures the BigQuery job type. */
+ abstract Builder setJobType(JobType value);
+
+ /** Configures the BigQuery read job with the SQL query. */
+ public abstract Builder setQuery(String value);
+
+ /**
+ * Specifies a table for a BigQuery read job. See {@link
BigQueryIO.TypedRead#from(String)}
+ * for more details on the expected format.
+ */
+ public abstract Builder setTableSpec(String value);
+
+ /** BigQuery geographic location where the query job will be executed. */
+ public abstract Builder setQueryLocation(String value);
+
+ /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+ public abstract Builder setUseStandardSql(Boolean value);
+
+ /** Builds the {@link Read} configuration. */
+ public abstract Read build();
+ }
+ }
+
+ /**
+ * Configuration for writing to BigQuery.
+ *
+ * <p>This class is meant to be used with {@link
BigQuerySchemaTransformWriteProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the
+ * Beam repository.
+ */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class Write {
+ private static final
TypeDescriptor<BigQuerySchemaTransformConfiguration.Write>
+ TYPE_DESCRIPTOR =
TypeDescriptor.of(BigQuerySchemaTransformConfiguration.Write.class);
+ private static final
SerializableFunction<BigQuerySchemaTransformConfiguration.Write, Row>
+ ROW_SERIALIZABLE_FUNCTION =
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+ /**
+ * Writes to the given table specification. See {@link
BigQueryIO.Write#to(String)}} for the
+ * expected format.
+ */
+ public abstract String getTableSpec();
+
+ /** Specifies whether the table should be created if it does not exist. */
+ public abstract String getCreateDisposition();
+
+ /** Specifies what to do with existing data in the table, in case the
table already exists. */
+ public abstract String getWriteDisposition();
+
+ /** Returns the {@link #getTableSpec()} as a {@link TableReference}. */
+ TableReference getTableReference() {
+ return BigQueryHelpers.parseTableSpec(getTableSpec());
+ }
+
+ /** Returns the {@link #getCreateDisposition()} as a {@link
CreateDisposition}. */
+ CreateDisposition getCreateDispositionEnum() {
+ return CreateDisposition.valueOf(getCreateDisposition());
+ }
+
+ /** Serializes configuration to a {@link Row}. */
+ Row toBeamRow() {
+ return ROW_SERIALIZABLE_FUNCTION.apply(this);
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ /**
+ * Writes to the given table specification. See {@link
BigQueryIO.Write#to(String)}} for the
+ * expected format.
+ */
+ public abstract Builder setTableSpec(String value);
+
+ /** Specifies whether the table should be created if it does not exist.
*/
+ public abstract Builder setCreateDisposition(String value);
+
+ /** Specifies what to do with existing data in the table, in case the
table already exists. */
+ public abstract Builder setWriteDisposition(String value);
+
+ /** Builds the {@link Write} configuration. */
+ public abstract Write build();
+ }
+ }
+
+ enum JobType {
Review Comment:
Shall we make these public.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {
+ private static final AutoValueSchema AUTO_VALUE_SCHEMA = new
AutoValueSchema();
+
+ /**
+ * Instantiates a {@link Read.Builder} from the SQL query.
+ *
+ * <p>The configuration defaults to useStandardSql=true.
+ */
+ public static Read.Builder createQueryBuilder(String query) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+ .setJobType(JobType.QUERY)
+ .setUseStandardSql(Read.DEFAULT_USE_STANDARD_SQL)
+ .setQuery(query);
+ }
+
+ /**
+ * Instantiates a {@link Read.Builder} to support BigQuery extract jobs. See
{@link
+ * BigQueryIO.TypedRead#from(String)} for the expected format.
+ */
+ public static Read.Builder createExtractBuilder(String tableSpec) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+ .setJobType(JobType.EXTRACT)
+ .setTableSpec(tableSpec);
+ }
+
+ /** Instantiates a {@link Read.Builder} to support BigQuery extract jobs. */
+ public static Read.Builder createExtractBuilder(TableReference
tableReference) {
+ return createExtractBuilder(BigQueryHelpers.toTableSpec(tableReference));
+ }
+
+ /**
+ * Instantiates a {@link Write.Builder} to support BigQuery load jobs. See
{@link
+ * BigQueryIO.Write#to(String)}} for toTableSpec expected format.
+ */
+ public static Write.Builder createLoadBuilder(
Review Comment:
Same. We can have single builder creation method and sanity check at the
time of build.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read
jobs configured using
+ * {@link BigQuerySchemaTransformConfiguration.Read}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+ extends
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Read> {
+
+ private static final String API = "bigquery";
+ private static final String VERSION = "v2";
+ private static final String OUTPUT_TAG = "OUTPUT";
+
+ /** Returns the expected class of the configuration. */
+ @Override
+ protected Class<BigQuerySchemaTransformConfiguration.Read>
configurationClass() {
+ return BigQuerySchemaTransformConfiguration.Read.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Read
configuration) {
+ return new BigQueryReadSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
+ @Override
+ public String identifier() {
+ return String.format("%s:%s", API, VERSION);
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since
+ * no input is expected, this returns an empty list.
+ */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
+ * a single output is expected, this returns a list with a single name.
+ */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.singletonList(OUTPUT_TAG);
+ }
+
+ /**
+ * An implementation of {@link SchemaTransform} for BigQuery read jobs
configured using {@link
+ * BigQuerySchemaTransformConfiguration.Read}.
+ */
+ static class BigQueryReadSchemaTransform implements SchemaTransform {
Review Comment:
Can we make this class private?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read
jobs configured using
+ * {@link BigQuerySchemaTransformConfiguration.Read}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+ extends
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Read> {
+
+ private static final String API = "bigquery";
+ private static final String VERSION = "v2";
+ private static final String OUTPUT_TAG = "OUTPUT";
+
+ /** Returns the expected class of the configuration. */
+ @Override
+ protected Class<BigQuerySchemaTransformConfiguration.Read>
configurationClass() {
+ return BigQuerySchemaTransformConfiguration.Read.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Read
configuration) {
+ return new BigQueryReadSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
+ @Override
+ public String identifier() {
Review Comment:
I think needs to have read some where in the name
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {
+ private static final AutoValueSchema AUTO_VALUE_SCHEMA = new
AutoValueSchema();
+
+ /**
+ * Instantiates a {@link Read.Builder} from the SQL query.
+ *
+ * <p>The configuration defaults to useStandardSql=true.
+ */
+ public static Read.Builder createQueryBuilder(String query) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+ .setJobType(JobType.QUERY)
+ .setUseStandardSql(Read.DEFAULT_USE_STANDARD_SQL)
+ .setQuery(query);
+ }
+
+ /**
+ * Instantiates a {@link Read.Builder} to support BigQuery extract jobs. See
{@link
+ * BigQueryIO.TypedRead#from(String)} for the expected format.
+ */
+ public static Read.Builder createExtractBuilder(String tableSpec) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+ .setJobType(JobType.EXTRACT)
+ .setTableSpec(tableSpec);
+ }
+
+ /** Instantiates a {@link Read.Builder} to support BigQuery extract jobs. */
+ public static Read.Builder createExtractBuilder(TableReference
tableReference) {
+ return createExtractBuilder(BigQueryHelpers.toTableSpec(tableReference));
+ }
+
+ /**
+ * Instantiates a {@link Write.Builder} to support BigQuery load jobs. See
{@link
+ * BigQueryIO.Write#to(String)}} for toTableSpec expected format.
+ */
+ public static Write.Builder createLoadBuilder(
+ String tableSpec, CreateDisposition createDisposition, WriteDisposition
writeDisposition) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Write.Builder()
+ .setTableSpec(tableSpec)
+ .setCreateDisposition(createDisposition.name())
+ .setWriteDisposition(writeDisposition.name());
+ }
+
+ /** Instantiates a {@link Write.Builder} to support BigQuery load jobs. */
+ public static Write.Builder createLoadBuilder(
+ TableReference toTable,
+ CreateDisposition createDisposition,
+ WriteDisposition writeDisposition) {
+ return createLoadBuilder(
+ BigQueryHelpers.toTableSpec(toTable), createDisposition,
writeDisposition);
+ }
+
+ /**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link
BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the
+ * Beam repository.
+ */
+ @SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+ })
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class Read {
+ private static final TypeDescriptor<Read> TYPE_DESCRIPTOR =
TypeDescriptor.of(Read.class);
+ private static final SerializableFunction<Read, Row>
ROW_SERIALIZABLE_FUNCTION =
+ AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+ private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+ /** Configures the BigQuery job type. */
+ abstract JobType getJobType();
+
+ /** Serializes configuration to a {@link Row}. */
+ Row toBeamRow() {
+ return ROW_SERIALIZABLE_FUNCTION.apply(this);
+ }
+
+ /** Configures the BigQuery read job with the SQL query. */
+ @Nullable
+ public abstract String getQuery();
+
+ /**
+ * Specifies a table for a BigQuery read job. See {@link
BigQueryIO.TypedRead#from(String)} for
+ * more details on the expected format.
+ */
+ @Nullable
+ public abstract String getTableSpec();
+
+ /** BigQuery geographic location where the query job will be executed. */
+ @Nullable
+ public abstract String getQueryLocation();
+
+ /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+ @Nullable
+ public abstract Boolean getUseStandardSql();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ /** Configures the BigQuery job type. */
+ abstract Builder setJobType(JobType value);
+
+ /** Configures the BigQuery read job with the SQL query. */
+ public abstract Builder setQuery(String value);
+
+ /**
+ * Specifies a table for a BigQuery read job. See {@link
BigQueryIO.TypedRead#from(String)}
+ * for more details on the expected format.
+ */
+ public abstract Builder setTableSpec(String value);
+
+ /** BigQuery geographic location where the query job will be executed. */
+ public abstract Builder setQueryLocation(String value);
+
+ /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+ public abstract Builder setUseStandardSql(Boolean value);
+
+ /** Builds the {@link Read} configuration. */
+ public abstract Read build();
+ }
+ }
+
+ /**
+ * Configuration for writing to BigQuery.
+ *
+ * <p>This class is meant to be used with {@link
BigQuerySchemaTransformWriteProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the
+ * Beam repository.
+ */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class Write {
+ private static final
TypeDescriptor<BigQuerySchemaTransformConfiguration.Write>
+ TYPE_DESCRIPTOR =
TypeDescriptor.of(BigQuerySchemaTransformConfiguration.Write.class);
+ private static final
SerializableFunction<BigQuerySchemaTransformConfiguration.Write, Row>
+ ROW_SERIALIZABLE_FUNCTION =
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+ /**
+ * Writes to the given table specification. See {@link
BigQueryIO.Write#to(String)}} for the
+ * expected format.
+ */
+ public abstract String getTableSpec();
+
+ /** Specifies whether the table should be created if it does not exist. */
+ public abstract String getCreateDisposition();
+
+ /** Specifies what to do with existing data in the table, in case the
table already exists. */
+ public abstract String getWriteDisposition();
+
+ /** Returns the {@link #getTableSpec()} as a {@link TableReference}. */
+ TableReference getTableReference() {
+ return BigQueryHelpers.parseTableSpec(getTableSpec());
+ }
+
+ /** Returns the {@link #getCreateDisposition()} as a {@link
CreateDisposition}. */
+ CreateDisposition getCreateDispositionEnum() {
Review Comment:
We can remove this.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {
+ private static final AutoValueSchema AUTO_VALUE_SCHEMA = new
AutoValueSchema();
+
+ /**
+ * Instantiates a {@link Read.Builder} from the SQL query.
+ *
+ * <p>The configuration defaults to useStandardSql=true.
+ */
+ public static Read.Builder createQueryBuilder(String query) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+ .setJobType(JobType.QUERY)
+ .setUseStandardSql(Read.DEFAULT_USE_STANDARD_SQL)
+ .setQuery(query);
+ }
+
+ /**
+ * Instantiates a {@link Read.Builder} to support BigQuery extract jobs. See
{@link
+ * BigQueryIO.TypedRead#from(String)} for the expected format.
+ */
+ public static Read.Builder createExtractBuilder(String tableSpec) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+ .setJobType(JobType.EXTRACT)
+ .setTableSpec(tableSpec);
+ }
+
+ /** Instantiates a {@link Read.Builder} to support BigQuery extract jobs. */
+ public static Read.Builder createExtractBuilder(TableReference
tableReference) {
+ return createExtractBuilder(BigQueryHelpers.toTableSpec(tableReference));
+ }
+
+ /**
+ * Instantiates a {@link Write.Builder} to support BigQuery load jobs. See
{@link
+ * BigQueryIO.Write#to(String)}} for toTableSpec expected format.
+ */
+ public static Write.Builder createLoadBuilder(
+ String tableSpec, CreateDisposition createDisposition, WriteDisposition
writeDisposition) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Write.Builder()
+ .setTableSpec(tableSpec)
+ .setCreateDisposition(createDisposition.name())
+ .setWriteDisposition(writeDisposition.name());
+ }
+
+ /** Instantiates a {@link Write.Builder} to support BigQuery load jobs. */
+ public static Write.Builder createLoadBuilder(
+ TableReference toTable,
+ CreateDisposition createDisposition,
+ WriteDisposition writeDisposition) {
+ return createLoadBuilder(
+ BigQueryHelpers.toTableSpec(toTable), createDisposition,
writeDisposition);
+ }
+
+ /**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link
BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the
+ * Beam repository.
+ */
+ @SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+ })
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class Read {
+ private static final TypeDescriptor<Read> TYPE_DESCRIPTOR =
TypeDescriptor.of(Read.class);
+ private static final SerializableFunction<Read, Row>
ROW_SERIALIZABLE_FUNCTION =
+ AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+ private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+ /** Configures the BigQuery job type. */
+ abstract JobType getJobType();
+
+ /** Serializes configuration to a {@link Row}. */
+ Row toBeamRow() {
+ return ROW_SERIALIZABLE_FUNCTION.apply(this);
+ }
+
+ /** Configures the BigQuery read job with the SQL query. */
+ @Nullable
+ public abstract String getQuery();
+
+ /**
+ * Specifies a table for a BigQuery read job. See {@link
BigQueryIO.TypedRead#from(String)} for
+ * more details on the expected format.
+ */
+ @Nullable
+ public abstract String getTableSpec();
+
+ /** BigQuery geographic location where the query job will be executed. */
+ @Nullable
+ public abstract String getQueryLocation();
+
+ /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+ @Nullable
+ public abstract Boolean getUseStandardSql();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ /** Configures the BigQuery job type. */
+ abstract Builder setJobType(JobType value);
+
+ /** Configures the BigQuery read job with the SQL query. */
+ public abstract Builder setQuery(String value);
+
+ /**
+ * Specifies a table for a BigQuery read job. See {@link
BigQueryIO.TypedRead#from(String)}
+ * for more details on the expected format.
+ */
+ public abstract Builder setTableSpec(String value);
+
+ /** BigQuery geographic location where the query job will be executed. */
+ public abstract Builder setQueryLocation(String value);
+
+ /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+ public abstract Builder setUseStandardSql(Boolean value);
+
+ /** Builds the {@link Read} configuration. */
+ public abstract Read build();
+ }
+ }
+
+ /**
+ * Configuration for writing to BigQuery.
+ *
+ * <p>This class is meant to be used with {@link
BigQuerySchemaTransformWriteProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the
+ * Beam repository.
+ */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class Write {
+ private static final
TypeDescriptor<BigQuerySchemaTransformConfiguration.Write>
+ TYPE_DESCRIPTOR =
TypeDescriptor.of(BigQuerySchemaTransformConfiguration.Write.class);
+ private static final
SerializableFunction<BigQuerySchemaTransformConfiguration.Write, Row>
+ ROW_SERIALIZABLE_FUNCTION =
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+ /**
+ * Writes to the given table specification. See {@link
BigQueryIO.Write#to(String)}} for the
+ * expected format.
+ */
+ public abstract String getTableSpec();
+
+ /** Specifies whether the table should be created if it does not exist. */
+ public abstract String getCreateDisposition();
+
+ /** Specifies what to do with existing data in the table, in case the
table already exists. */
+ public abstract String getWriteDisposition();
+
+ /** Returns the {@link #getTableSpec()} as a {@link TableReference}. */
+ TableReference getTableReference() {
+ return BigQueryHelpers.parseTableSpec(getTableSpec());
+ }
+
+ /** Returns the {@link #getCreateDisposition()} as a {@link
CreateDisposition}. */
+ CreateDisposition getCreateDispositionEnum() {
+ return CreateDisposition.valueOf(getCreateDisposition());
+ }
+
+ /** Serializes configuration to a {@link Row}. */
+ Row toBeamRow() {
Review Comment:
We can remove this.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformConfiguration.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Configurations for reading from or writing to BigQuery. */
+public class BigQuerySchemaTransformConfiguration {
+ private static final AutoValueSchema AUTO_VALUE_SCHEMA = new
AutoValueSchema();
+
+ /**
+ * Instantiates a {@link Read.Builder} from the SQL query.
+ *
+ * <p>The configuration defaults to useStandardSql=true.
+ */
+ public static Read.Builder createQueryBuilder(String query) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+ .setJobType(JobType.QUERY)
+ .setUseStandardSql(Read.DEFAULT_USE_STANDARD_SQL)
+ .setQuery(query);
+ }
+
+ /**
+ * Instantiates a {@link Read.Builder} to support BigQuery extract jobs. See
{@link
+ * BigQueryIO.TypedRead#from(String)} for the expected format.
+ */
+ public static Read.Builder createExtractBuilder(String tableSpec) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Read.Builder()
+ .setJobType(JobType.EXTRACT)
+ .setTableSpec(tableSpec);
+ }
+
+ /** Instantiates a {@link Read.Builder} to support BigQuery extract jobs. */
+ public static Read.Builder createExtractBuilder(TableReference
tableReference) {
+ return createExtractBuilder(BigQueryHelpers.toTableSpec(tableReference));
+ }
+
+ /**
+ * Instantiates a {@link Write.Builder} to support BigQuery load jobs. See
{@link
+ * BigQueryIO.Write#to(String)}} for toTableSpec expected format.
+ */
+ public static Write.Builder createLoadBuilder(
+ String tableSpec, CreateDisposition createDisposition, WriteDisposition
writeDisposition) {
+ return new AutoValue_BigQuerySchemaTransformConfiguration_Write.Builder()
+ .setTableSpec(tableSpec)
+ .setCreateDisposition(createDisposition.name())
+ .setWriteDisposition(writeDisposition.name());
+ }
+
+ /** Instantiates a {@link Write.Builder} to support BigQuery load jobs. */
+ public static Write.Builder createLoadBuilder(
+ TableReference toTable,
+ CreateDisposition createDisposition,
+ WriteDisposition writeDisposition) {
+ return createLoadBuilder(
+ BigQueryHelpers.toTableSpec(toTable), createDisposition,
writeDisposition);
+ }
+
+ /**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link
BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the
+ * Beam repository.
+ */
+ @SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+ })
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class Read {
+ private static final TypeDescriptor<Read> TYPE_DESCRIPTOR =
TypeDescriptor.of(Read.class);
+ private static final SerializableFunction<Read, Row>
ROW_SERIALIZABLE_FUNCTION =
+ AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+ private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+ /** Configures the BigQuery job type. */
+ abstract JobType getJobType();
+
+ /** Serializes configuration to a {@link Row}. */
+ Row toBeamRow() {
+ return ROW_SERIALIZABLE_FUNCTION.apply(this);
+ }
+
+ /** Configures the BigQuery read job with the SQL query. */
+ @Nullable
+ public abstract String getQuery();
+
+ /**
+ * Specifies a table for a BigQuery read job. See {@link
BigQueryIO.TypedRead#from(String)} for
+ * more details on the expected format.
+ */
+ @Nullable
+ public abstract String getTableSpec();
+
+ /** BigQuery geographic location where the query job will be executed. */
+ @Nullable
+ public abstract String getQueryLocation();
+
+ /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+ @Nullable
+ public abstract Boolean getUseStandardSql();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ /** Configures the BigQuery job type. */
+ abstract Builder setJobType(JobType value);
+
+ /** Configures the BigQuery read job with the SQL query. */
+ public abstract Builder setQuery(String value);
+
+ /**
+ * Specifies a table for a BigQuery read job. See {@link
BigQueryIO.TypedRead#from(String)}
+ * for more details on the expected format.
+ */
+ public abstract Builder setTableSpec(String value);
+
+ /** BigQuery geographic location where the query job will be executed. */
+ public abstract Builder setQueryLocation(String value);
+
+ /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+ public abstract Builder setUseStandardSql(Boolean value);
+
+ /** Builds the {@link Read} configuration. */
+ public abstract Read build();
+ }
+ }
+
+ /**
+ * Configuration for writing to BigQuery.
+ *
+ * <p>This class is meant to be used with {@link
BigQuerySchemaTransformWriteProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the
+ * Beam repository.
+ */
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ public abstract static class Write {
+ private static final
TypeDescriptor<BigQuerySchemaTransformConfiguration.Write>
+ TYPE_DESCRIPTOR =
TypeDescriptor.of(BigQuerySchemaTransformConfiguration.Write.class);
+ private static final
SerializableFunction<BigQuerySchemaTransformConfiguration.Write, Row>
+ ROW_SERIALIZABLE_FUNCTION =
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+ /**
+ * Writes to the given table specification. See {@link
BigQueryIO.Write#to(String)}} for the
+ * expected format.
+ */
+ public abstract String getTableSpec();
+
+ /** Specifies whether the table should be created if it does not exist. */
+ public abstract String getCreateDisposition();
+
+ /** Specifies what to do with existing data in the table, in case the
table already exists. */
+ public abstract String getWriteDisposition();
+
+ /** Returns the {@link #getTableSpec()} as a {@link TableReference}. */
+ TableReference getTableReference() {
Review Comment:
We can remove this.
Its generally good to avoid single line methods which are not exposed.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery
write jobs configured
+ * using {@link BigQuerySchemaTransformConfiguration.Write}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+ extends
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Write> {
+
+ private static final String API = "bigquery";
+ private static final String VERSION = "v2";
+ static final String INPUT_TAG = "INPUT";
+
+ /** Returns the expected class of the configuration. */
+ @Override
+ protected Class<BigQuerySchemaTransformConfiguration.Write>
configurationClass() {
+ return BigQuerySchemaTransformConfiguration.Write.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Write
configuration) {
+ return new BigQueryWriteSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
+ @Override
+ public String identifier() {
+ return String.format("%s:%s", API, VERSION);
Review Comment:
We should have write in the name
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery
write jobs configured
+ * using {@link BigQuerySchemaTransformConfiguration.Write}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+ extends
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Write> {
+
+ private static final String API = "bigquery";
+ private static final String VERSION = "v2";
+ static final String INPUT_TAG = "INPUT";
+
+ /** Returns the expected class of the configuration. */
+ @Override
+ protected Class<BigQuerySchemaTransformConfiguration.Write>
configurationClass() {
+ return BigQuerySchemaTransformConfiguration.Write.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Write
configuration) {
+ return new BigQueryWriteSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
+ @Override
+ public String identifier() {
+ return String.format("%s:%s", API, VERSION);
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since a
+ * single is expected, this returns a list with a single name.
+ */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.singletonList(INPUT_TAG);
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
+ * no output is expected, this returns an empty list.
+ */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based
on a {@link
+ * BigQuerySchemaTransformConfiguration.Write}.
+ */
+ static class BigQueryWriteSchemaTransform implements SchemaTransform {
+ private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+ BigQueryWriteSchemaTransform(BigQuerySchemaTransformConfiguration.Write
configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PTransform<PCollectionRowTuple, PCollectionRowTuple>
buildTransform() {
+ return new PCollectionRowTupleTransform(configuration);
+ }
+ }
+
+ /**
+ * An implementation of {@link PTransform} for BigQuery write jobs
configured using {@link
+ * BigQuerySchemaTransformConfiguration.Write}.
+ */
+ static class PCollectionRowTupleTransform
+ extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+ private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+ /** An instance of {@link BigQueryServices} used for testing. */
+ private BigQueryServices testBigQueryServices = null;
+
+ PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Write
configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ CreateDisposition createDisposition =
configuration.getCreateDispositionEnum();
+ Schema destinationSchema = getDestinationRowSchema(options);
+
+ if (destinationSchema == null) {
+ // We only care if the create disposition implies an existing table
i.e. create never.
+ if (createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
+ throw new InvalidConfigurationException(
+ String.format(
+ "configuration create disposition: %s for table: %s for a
null destination schema",
+ createDisposition, configuration.getTableSpec()));
+ }
+ }
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ validate(input);
+ PCollection<Row> rowPCollection = input.get(INPUT_TAG);
+ Schema schema = rowPCollection.getSchema();
+ BigQueryIO.Write<TableRow> write = toWrite(schema);
+ if (testBigQueryServices != null) {
+ write = write.withTestServices(testBigQueryServices);
+ }
+
+ PCollection<TableRow> tableRowPCollection =
+ rowPCollection.apply(
+
MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
+ tableRowPCollection.apply(write);
+ return PCollectionRowTuple.empty(input.getPipeline());
+ }
+
+ /** Instantiates a {@link BigQueryIO.Write<TableRow>} from a {@link
Schema}. */
+ BigQueryIO.Write<TableRow> toWrite(Schema schema) {
+ TableSchema tableSchema = BigQueryUtils.toTableSchema(schema);
+ CreateDisposition createDisposition =
+ CreateDisposition.valueOf(configuration.getCreateDisposition());
+ WriteDisposition writeDisposition =
+ WriteDisposition.valueOf(configuration.getWriteDisposition());
+
+ return BigQueryIO.writeTableRows()
+ .to(configuration.getTableSpec())
+ .withCreateDisposition(createDisposition)
+ .withWriteDisposition(writeDisposition)
+ .withSchema(tableSchema);
+ }
+
+ /** Setter for testing using {@link BigQueryServices}. */
+ void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+ this.testBigQueryServices = testBigQueryServices;
+ }
+
+ /** Validate a {@link PCollectionRowTuple} input. */
+ void validate(PCollectionRowTuple input) {
+ if (!input.has(INPUT_TAG)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s %s is missing expected tag: %s",
+ getClass().getSimpleName(), input.getClass().getSimpleName(),
INPUT_TAG));
+ }
+
+ validate(input.get(INPUT_TAG));
Review Comment:
I think we can inline this method call.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery
write jobs configured
+ * using {@link BigQuerySchemaTransformConfiguration.Write}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+ extends
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Write> {
+
+ private static final String API = "bigquery";
+ private static final String VERSION = "v2";
+ static final String INPUT_TAG = "INPUT";
+
+ /** Returns the expected class of the configuration. */
+ @Override
+ protected Class<BigQuerySchemaTransformConfiguration.Write>
configurationClass() {
+ return BigQuerySchemaTransformConfiguration.Write.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Write
configuration) {
+ return new BigQueryWriteSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
+ @Override
+ public String identifier() {
+ return String.format("%s:%s", API, VERSION);
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since a
+ * single is expected, this returns a list with a single name.
+ */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.singletonList(INPUT_TAG);
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
+ * no output is expected, this returns an empty list.
+ */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based
on a {@link
+ * BigQuerySchemaTransformConfiguration.Write}.
+ */
+ static class BigQueryWriteSchemaTransform implements SchemaTransform {
+ private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+ BigQueryWriteSchemaTransform(BigQuerySchemaTransformConfiguration.Write
configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PTransform<PCollectionRowTuple, PCollectionRowTuple>
buildTransform() {
+ return new PCollectionRowTupleTransform(configuration);
+ }
+ }
+
+ /**
+ * An implementation of {@link PTransform} for BigQuery write jobs
configured using {@link
+ * BigQuerySchemaTransformConfiguration.Write}.
+ */
+ static class PCollectionRowTupleTransform
+ extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+ private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+ /** An instance of {@link BigQueryServices} used for testing. */
+ private BigQueryServices testBigQueryServices = null;
+
+ PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Write
configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ CreateDisposition createDisposition =
configuration.getCreateDispositionEnum();
+ Schema destinationSchema = getDestinationRowSchema(options);
+
+ if (destinationSchema == null) {
+ // We only care if the create disposition implies an existing table
i.e. create never.
+ if (createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
+ throw new InvalidConfigurationException(
+ String.format(
+ "configuration create disposition: %s for table: %s for a
null destination schema",
+ createDisposition, configuration.getTableSpec()));
+ }
+ }
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ validate(input);
+ PCollection<Row> rowPCollection = input.get(INPUT_TAG);
+ Schema schema = rowPCollection.getSchema();
+ BigQueryIO.Write<TableRow> write = toWrite(schema);
+ if (testBigQueryServices != null) {
+ write = write.withTestServices(testBigQueryServices);
+ }
+
+ PCollection<TableRow> tableRowPCollection =
+ rowPCollection.apply(
+
MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
+ tableRowPCollection.apply(write);
+ return PCollectionRowTuple.empty(input.getPipeline());
+ }
+
+ /** Instantiates a {@link BigQueryIO.Write<TableRow>} from a {@link
Schema}. */
+ BigQueryIO.Write<TableRow> toWrite(Schema schema) {
+ TableSchema tableSchema = BigQueryUtils.toTableSchema(schema);
+ CreateDisposition createDisposition =
+ CreateDisposition.valueOf(configuration.getCreateDisposition());
+ WriteDisposition writeDisposition =
+ WriteDisposition.valueOf(configuration.getWriteDisposition());
+
+ return BigQueryIO.writeTableRows()
+ .to(configuration.getTableSpec())
+ .withCreateDisposition(createDisposition)
+ .withWriteDisposition(writeDisposition)
+ .withSchema(tableSchema);
+ }
+
+ /** Setter for testing using {@link BigQueryServices}. */
+ void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+ this.testBigQueryServices = testBigQueryServices;
+ }
+
+ /** Validate a {@link PCollectionRowTuple} input. */
+ void validate(PCollectionRowTuple input) {
+ if (!input.has(INPUT_TAG)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s %s is missing expected tag: %s",
+ getClass().getSimpleName(), input.getClass().getSimpleName(),
INPUT_TAG));
+ }
+
+ validate(input.get(INPUT_TAG));
+ }
+
+ /** Validate a {@link PCollection<Row>} input. */
+ void validate(PCollection<Row> input) {
+ Schema sourceSchema = input.getSchema();
+ if (sourceSchema == null) {
+ throw new IllegalArgumentException(
+ String.format("%s is null for input of tag: %s", Schema.class,
INPUT_TAG));
+ }
+
+ Schema destinationSchema =
getDestinationRowSchema(input.getPipeline().getOptions());
+
+ // We already evaluate whether we should acquire a destination schema.
+ // See the validate(PipelineOptions) method for details.
+ if (destinationSchema != null) {
+ validateMatching(sourceSchema, destinationSchema);
+ }
+ }
+
+ void validateMatching(Schema sourceSchema, Schema destinationSchema) {
+ Set<String> fieldNames = new HashSet<>();
+ List<String> mismatchedFieldNames = new ArrayList<>();
+ fieldNames.addAll(sourceSchema.getFieldNames());
+ fieldNames.addAll(destinationSchema.getFieldNames());
+ for (String name : fieldNames) {
+ Field gotField = null;
+ Field wantField = null;
+ if (sourceSchema.hasField(name)) {
+ gotField = sourceSchema.getField(name);
+ }
+ if (destinationSchema.hasField(name)) {
+ wantField = destinationSchema.getField(name);
+ }
+ if (!matches(wantField, gotField)) {
+ mismatchedFieldNames.add(name);
+ }
+ }
+
+ if (!mismatchedFieldNames.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "source and destination schema mismatch for table: %s with
fields: %s",
+ configuration.getTableSpec(), String.join(" | ",
mismatchedFieldNames)));
+ }
+ }
+
+ private boolean matches(Field want, Field got) {
+ if (want == null && got == null) {
+ return true;
+ }
+ if (want == null) {
+ return false;
+ }
+ return want.equals(got);
+ }
+
+ TableSchema getDestinationTableSchema(BigQueryOptions options) {
+ Table destinationTable = getDestinationTable(options);
+ if (destinationTable == null) {
+ return null;
+ }
+ return destinationTable.getSchema();
+ }
+
+ DatasetService getDatasetService(BigQueryOptions options) {
+ BigQueryServices bigQueryServices = testBigQueryServices;
+ if (bigQueryServices == null) {
+ bigQueryServices = new BigQueryServicesImpl();
+ }
+ return bigQueryServices.getDatasetService(options);
+ }
+
+ Table getDestinationTable(BigQueryOptions options) {
+ Table destinationTable = null;
+ CreateDisposition createDisposition =
configuration.getCreateDispositionEnum();
+ DatasetService datasetService = getDatasetService(options);
+ try {
+ destinationTable =
datasetService.getTable(configuration.getTableReference());
+ } catch (IOException | InterruptedException e) {
+ // We only care if the create disposition implies an existing table
i.e. create never.
+ if (createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
+ throw new InvalidConfigurationException(
+ String.format(
+ "error querying destination schema for create disposition:
%s for table: %s, error: %s",
+ createDisposition, configuration.getTableSpec(),
e.getMessage()));
+ }
+ }
+ return destinationTable;
+ }
+
+ Schema getDestinationRowSchema(BigQueryOptions options) {
+ TableSchema tableSchema = getDestinationTableSchema(options);
Review Comment:
I think we can inline this.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java:
##########
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery
write jobs configured
+ * using {@link BigQuerySchemaTransformConfiguration.Write}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+ extends
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Write> {
+
+ private static final String API = "bigquery";
+ private static final String VERSION = "v2";
+ static final String INPUT_TAG = "INPUT";
+
+ /** Returns the expected class of the configuration. */
+ @Override
+ protected Class<BigQuerySchemaTransformConfiguration.Write>
configurationClass() {
+ return BigQuerySchemaTransformConfiguration.Write.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Write
configuration) {
+ return new BigQueryWriteSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
+ @Override
+ public String identifier() {
+ return String.format("%s:%s", API, VERSION);
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since a
+ * single is expected, this returns a list with a single name.
+ */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.singletonList(INPUT_TAG);
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
+ * no output is expected, this returns an empty list.
+ */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based
on a {@link
+ * BigQuerySchemaTransformConfiguration.Write}.
+ */
+ static class BigQueryWriteSchemaTransform implements SchemaTransform {
+ private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+ BigQueryWriteSchemaTransform(BigQuerySchemaTransformConfiguration.Write
configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PTransform<PCollectionRowTuple, PCollectionRowTuple>
buildTransform() {
+ return new PCollectionRowTupleTransform(configuration);
+ }
+ }
+
+ /**
+ * An implementation of {@link PTransform} for BigQuery write jobs
configured using {@link
+ * BigQuerySchemaTransformConfiguration.Write}.
+ */
+ static class PCollectionRowTupleTransform
+ extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+ private final BigQuerySchemaTransformConfiguration.Write configuration;
+
+ /** An instance of {@link BigQueryServices} used for testing. */
+ private BigQueryServices testBigQueryServices = null;
+
+ PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Write
configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void validate(PipelineOptions options) {
+ CreateDisposition createDisposition =
configuration.getCreateDispositionEnum();
+ Schema destinationSchema = getDestinationRowSchema(options);
+
+ if (destinationSchema == null) {
+ // We only care if the create disposition implies an existing table
i.e. create never.
+ if (createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
+ throw new InvalidConfigurationException(
+ String.format(
+ "configuration create disposition: %s for table: %s for a
null destination schema",
+ createDisposition, configuration.getTableSpec()));
+ }
+ }
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ validate(input);
+ PCollection<Row> rowPCollection = input.get(INPUT_TAG);
+ Schema schema = rowPCollection.getSchema();
+ BigQueryIO.Write<TableRow> write = toWrite(schema);
+ if (testBigQueryServices != null) {
+ write = write.withTestServices(testBigQueryServices);
+ }
+
+ PCollection<TableRow> tableRowPCollection =
+ rowPCollection.apply(
+
MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
+ tableRowPCollection.apply(write);
+ return PCollectionRowTuple.empty(input.getPipeline());
+ }
+
+ /** Instantiates a {@link BigQueryIO.Write<TableRow>} from a {@link
Schema}. */
+ BigQueryIO.Write<TableRow> toWrite(Schema schema) {
+ TableSchema tableSchema = BigQueryUtils.toTableSchema(schema);
+ CreateDisposition createDisposition =
+ CreateDisposition.valueOf(configuration.getCreateDisposition());
+ WriteDisposition writeDisposition =
+ WriteDisposition.valueOf(configuration.getWriteDisposition());
+
+ return BigQueryIO.writeTableRows()
+ .to(configuration.getTableSpec())
+ .withCreateDisposition(createDisposition)
+ .withWriteDisposition(writeDisposition)
+ .withSchema(tableSchema);
+ }
+
+ /** Setter for testing using {@link BigQueryServices}. */
+ void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+ this.testBigQueryServices = testBigQueryServices;
+ }
+
+ /** Validate a {@link PCollectionRowTuple} input. */
+ void validate(PCollectionRowTuple input) {
+ if (!input.has(INPUT_TAG)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s %s is missing expected tag: %s",
+ getClass().getSimpleName(), input.getClass().getSimpleName(),
INPUT_TAG));
+ }
+
+ validate(input.get(INPUT_TAG));
+ }
+
+ /** Validate a {@link PCollection<Row>} input. */
+ void validate(PCollection<Row> input) {
+ Schema sourceSchema = input.getSchema();
+ if (sourceSchema == null) {
+ throw new IllegalArgumentException(
+ String.format("%s is null for input of tag: %s", Schema.class,
INPUT_TAG));
+ }
+
+ Schema destinationSchema =
getDestinationRowSchema(input.getPipeline().getOptions());
+
+ // We already evaluate whether we should acquire a destination schema.
+ // See the validate(PipelineOptions) method for details.
+ if (destinationSchema != null) {
+ validateMatching(sourceSchema, destinationSchema);
+ }
+ }
+
+ void validateMatching(Schema sourceSchema, Schema destinationSchema) {
+ Set<String> fieldNames = new HashSet<>();
+ List<String> mismatchedFieldNames = new ArrayList<>();
+ fieldNames.addAll(sourceSchema.getFieldNames());
+ fieldNames.addAll(destinationSchema.getFieldNames());
+ for (String name : fieldNames) {
+ Field gotField = null;
+ Field wantField = null;
+ if (sourceSchema.hasField(name)) {
+ gotField = sourceSchema.getField(name);
+ }
+ if (destinationSchema.hasField(name)) {
+ wantField = destinationSchema.getField(name);
+ }
+ if (!matches(wantField, gotField)) {
Review Comment:
I think we can simplify the if conditions in the method and inline it.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read
jobs configured using
+ * {@link BigQuerySchemaTransformConfiguration.Read}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+ extends
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Read> {
+
+ private static final String API = "bigquery";
+ private static final String VERSION = "v2";
+ private static final String OUTPUT_TAG = "OUTPUT";
+
+ /** Returns the expected class of the configuration. */
+ @Override
+ protected Class<BigQuerySchemaTransformConfiguration.Read>
configurationClass() {
+ return BigQuerySchemaTransformConfiguration.Read.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Read
configuration) {
+ return new BigQueryReadSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
+ @Override
+ public String identifier() {
+ return String.format("%s:%s", API, VERSION);
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since
+ * no input is expected, this returns an empty list.
+ */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
+ * a single output is expected, this returns a list with a single name.
+ */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.singletonList(OUTPUT_TAG);
+ }
+
+ /**
+ * An implementation of {@link SchemaTransform} for BigQuery read jobs
configured using {@link
+ * BigQuerySchemaTransformConfiguration.Read}.
+ */
+ static class BigQueryReadSchemaTransform implements SchemaTransform {
+ private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+ BigQueryReadSchemaTransform(BigQuerySchemaTransformConfiguration.Read
configuration) {
+ this.configuration = configuration;
+ }
+
+ /** Implements {@link SchemaTransform} buildTransform method. */
+ @Override
+ public PTransform<PCollectionRowTuple, PCollectionRowTuple>
buildTransform() {
+ return new PCollectionRowTupleTransform(configuration);
+ }
+ }
+
+ /**
+ * An implementation of {@link PTransform} for BigQuery read jobs configured
using {@link
+ * BigQuerySchemaTransformConfiguration.Read}.
+ */
+ static class PCollectionRowTupleTransform
+ extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+ private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+ /** An instance of {@link BigQueryServices} used for testing. */
+ private BigQueryServices testBigQueryServices = null;
+
+ PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Read
configuration) {
+ this.configuration = configuration;
+ }
+
+ void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+ this.testBigQueryServices = testBigQueryServices;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ if (!input.getAll().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s %s input is expected to be empty",
+ input.getClass().getSimpleName(), getClass().getSimpleName()));
+ }
+
+ BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+ if (testBigQueryServices != null) {
+ read = read.withTestServices(testBigQueryServices).withoutValidation();
+ }
+
+ PCollection<TableRow> tableRowPCollection =
input.getPipeline().apply(read);
+ Schema schema = tableRowPCollection.getSchema();
+ PCollection<Row> rowPCollection =
+ tableRowPCollection.apply(
+ MapElements.into(TypeDescriptor.of(Row.class))
+ .via((tableRow) -> BigQueryUtils.toBeamRow(schema,
tableRow)));
+ return PCollectionRowTuple.of(OUTPUT_TAG,
rowPCollection.setRowSchema(schema));
+ }
+
+ BigQueryIO.TypedRead<TableRow> toTypedRead() {
+ JobType jobType = configuration.getJobType();
+ switch (jobType) {
+ case QUERY:
+ return toQueryTypedRead();
+
+ case EXTRACT:
+ return toExtractTypedRead();
+
+ default:
+ throw new InvalidConfigurationException(
+ String.format("invalid job type for BigQueryIO read, got: %s",
jobType));
+ }
+ }
+
+ private BigQueryIO.TypedRead<TableRow> toExtractTypedRead() {
+ return
BigQueryIO.readTableRowsWithSchema().from(configuration.getTableSpec());
+ }
+
+ private BigQueryIO.TypedRead<TableRow> toQueryTypedRead() {
+ String query = Objects.requireNonNull(configuration.getQuery());
+
+ BigQueryIO.TypedRead<TableRow> read =
BigQueryIO.readTableRowsWithSchema().fromQuery(query);
+
+ if (configuration.getQueryLocation() != null) {
Review Comment:
```suggestion
if (configuration.getQueryLocation() != null &&
!configuration.getQueryLocation().isEmpty()) {
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read
jobs configured using
+ * {@link BigQuerySchemaTransformConfiguration.Read}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+ extends
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Read> {
+
+ private static final String API = "bigquery";
+ private static final String VERSION = "v2";
+ private static final String OUTPUT_TAG = "OUTPUT";
+
+ /** Returns the expected class of the configuration. */
+ @Override
+ protected Class<BigQuerySchemaTransformConfiguration.Read>
configurationClass() {
+ return BigQuerySchemaTransformConfiguration.Read.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Read
configuration) {
+ return new BigQueryReadSchemaTransform(configuration);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
+ @Override
+ public String identifier() {
+ return String.format("%s:%s", API, VERSION);
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since
+ * no input is expected, this returns an empty list.
+ */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
+ * a single output is expected, this returns a list with a single name.
+ */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.singletonList(OUTPUT_TAG);
+ }
+
+ /**
+ * An implementation of {@link SchemaTransform} for BigQuery read jobs
configured using {@link
+ * BigQuerySchemaTransformConfiguration.Read}.
+ */
+ static class BigQueryReadSchemaTransform implements SchemaTransform {
+ private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+ BigQueryReadSchemaTransform(BigQuerySchemaTransformConfiguration.Read
configuration) {
+ this.configuration = configuration;
+ }
+
+ /** Implements {@link SchemaTransform} buildTransform method. */
+ @Override
+ public PTransform<PCollectionRowTuple, PCollectionRowTuple>
buildTransform() {
+ return new PCollectionRowTupleTransform(configuration);
+ }
+ }
+
+ /**
+ * An implementation of {@link PTransform} for BigQuery read jobs configured
using {@link
+ * BigQuerySchemaTransformConfiguration.Read}.
+ */
+ static class PCollectionRowTupleTransform
+ extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+ private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+ /** An instance of {@link BigQueryServices} used for testing. */
+ private BigQueryServices testBigQueryServices = null;
+
+ PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Read
configuration) {
+ this.configuration = configuration;
+ }
+
+ void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+ this.testBigQueryServices = testBigQueryServices;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ if (!input.getAll().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s %s input is expected to be empty",
+ input.getClass().getSimpleName(), getClass().getSimpleName()));
+ }
+
+ BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+ if (testBigQueryServices != null) {
+ read = read.withTestServices(testBigQueryServices).withoutValidation();
+ }
+
+ PCollection<TableRow> tableRowPCollection =
input.getPipeline().apply(read);
+ Schema schema = tableRowPCollection.getSchema();
+ PCollection<Row> rowPCollection =
+ tableRowPCollection.apply(
+ MapElements.into(TypeDescriptor.of(Row.class))
+ .via((tableRow) -> BigQueryUtils.toBeamRow(schema,
tableRow)));
+ return PCollectionRowTuple.of(OUTPUT_TAG,
rowPCollection.setRowSchema(schema));
+ }
+
+ BigQueryIO.TypedRead<TableRow> toTypedRead() {
+ JobType jobType = configuration.getJobType();
+ switch (jobType) {
+ case QUERY:
+ return toQueryTypedRead();
+
+ case EXTRACT:
+ return toExtractTypedRead();
+
+ default:
+ throw new InvalidConfigurationException(
+ String.format("invalid job type for BigQueryIO read, got: %s",
jobType));
+ }
+ }
+
+ private BigQueryIO.TypedRead<TableRow> toExtractTypedRead() {
+ return
BigQueryIO.readTableRowsWithSchema().from(configuration.getTableSpec());
+ }
+
+ private BigQueryIO.TypedRead<TableRow> toQueryTypedRead() {
+ String query = Objects.requireNonNull(configuration.getQuery());
+
+ BigQueryIO.TypedRead<TableRow> read =
BigQueryIO.readTableRowsWithSchema().fromQuery(query);
+
+ if (configuration.getQueryLocation() != null) {
+ read = read.withQueryLocation(configuration.getQueryLocation());
+ }
+
+ if (configuration.getUseStandardSql() != null) {
+ if (configuration.getUseStandardSql()) {
+ read = read.usingStandardSql();
+ }
+ }
Review Comment:
```suggestion
if (configuration.getUseStandardSql() != null &&
configuration.getUseStandardSql()) {
read = read.usingStandardSql();
}
```
Issue Time Tracking
-------------------
Worklog Id: (was: 755633)
Time Spent: 5h (was: 4h 50m)
> Convert BigQuery SchemaIO to SchemaTransform
> --------------------------------------------
>
> Key: BEAM-14035
> URL: https://issues.apache.org/jira/browse/BEAM-14035
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-java-core
> Reporter: Lara Schmidt
> Assignee: Damon Douglas
> Priority: P2
> Time Spent: 5h
> Remaining Estimate: 0h
>
> The output of this task is to refactor
> org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider [1] to implement
> SchemaTransform [2] and SchemaTransformProvider interfaces [3].
> Please see https://issues.apache.org/jira/browse/BEAM-14168 for more
> information on additional work remaining after this task is complete.
> As a result of this task there will be:
> 1. one class deletion and four classes created for the PR reflecting whether
> we are reading from or writing to BigQuery, via BigQueryIO.Read and
> BigQueryIO.Write, respectively.
> * delete:
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
> * create:
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
> * create:
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
> * create:
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java
> * create:
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
> 2. The testing strategy will leverage the use of the
> org.apache.beam.sdk.transforms.display.DisplayData class instead of using the
> org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices.
> We will test whether the input of BigQuerySchemaTransformReadConfiguration
> yields a BigQueryIO.Read with correct query, tableSpec, useLegacySQL
> (defaulted to false), and queryLocation values.
> We will test whether the input of BigQuerySchemaTransformWriteConfiguration
> yields a BigQueryIO.Write with correct tableReference, createDisposition, and
> writeDisposition values.
> References:
> 1 -
> [sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java|https://github.com/apache/beam/blob/fc00b9697a0dfb73a03981f4d6c2c8dd1e316d5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java]
> 2 -
> [SchemaTransform|https://github.com/apache/beam/blob/a47a725863fc8c37a9b8520cebeef83677fe531d/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java]
> 3 -
> [SchemaTransformProvider|https://github.com/apache/beam/blob/a47a725863fc8c37a9b8520cebeef83677fe531d/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java]
--
This message was sent by Atlassian Jira
(v8.20.1#820001)