[ 
https://issues.apache.org/jira/browse/BEAM-14035?focusedWorklogId=757246&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-757246
 ]

ASF GitHub Bot logged work on BEAM-14035:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Apr/22 22:44
            Start Date: 14/Apr/22 22:44
    Worklog Time Spent: 10m 
      Work Description: damondouglas commented on code in PR #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r850867016


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read 
jobs configured using
+ * {@link BigQuerySchemaTransformConfiguration.Read}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<BigQuerySchemaTransformConfiguration.Read> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformConfiguration.Read> 
configurationClass() {
+    return BigQuerySchemaTransformConfiguration.Read.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs 
configured using {@link
+   * BigQuerySchemaTransformConfiguration.Read}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured 
using {@link
+   * BigQuerySchemaTransformConfiguration.Read}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformConfiguration.Read configuration;
+
+    /** An instance of {@link BigQueryServices} used for testing. */
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformConfiguration.Read 
configuration) {
+      this.configuration = configuration;
+    }
+
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+      if (testBigQueryServices != null) {
+        read = read.withTestServices(testBigQueryServices).withoutValidation();
+      }
+
+      PCollection<TableRow> tableRowPCollection = 
input.getPipeline().apply(read);
+      Schema schema = tableRowPCollection.getSchema();
+      PCollection<Row> rowPCollection =
+          tableRowPCollection.apply(
+              MapElements.into(TypeDescriptor.of(Row.class))
+                  .via((tableRow) -> BigQueryUtils.toBeamRow(schema, 
tableRow)));
+      return PCollectionRowTuple.of(OUTPUT_TAG, 
rowPCollection.setRowSchema(schema));
+    }
+
+    BigQueryIO.TypedRead<TableRow> toTypedRead() {
+      JobType jobType = configuration.getJobType();
+      switch (jobType) {
+        case QUERY:
+          return toQueryTypedRead();
+
+        case EXTRACT:
+          return toExtractTypedRead();
+
+        default:
+          throw new InvalidConfigurationException(
+              String.format("invalid job type for BigQueryIO read, got: %s", 
jobType));
+      }
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toExtractTypedRead() {
+      return 
BigQueryIO.readTableRowsWithSchema().from(configuration.getTableSpec());
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toQueryTypedRead() {
+      String query = Objects.requireNonNull(configuration.getQuery());
+
+      BigQueryIO.TypedRead<TableRow> read = 
BigQueryIO.readTableRowsWithSchema().fromQuery(query);
+
+      if (configuration.getQueryLocation() != null) {

Review Comment:
   We would then need to do the same for getQuery() and getTableSpec() methods 
as well.  Should we rely on the validation to validate non-emptiness?  In this 
method do we just want to make sure we are not assigning null values?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 757246)
    Time Spent: 5.5h  (was: 5h 20m)

> Convert BigQuery SchemaIO to SchemaTransform
> --------------------------------------------
>
>                 Key: BEAM-14035
>                 URL: https://issues.apache.org/jira/browse/BEAM-14035
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Lara Schmidt
>            Assignee: Damon Douglas
>            Priority: P2
>          Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> The output of this task is to refactor 
> org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider [1] to implement 
> SchemaTransform [2] and SchemaTransformProvider interfaces [3].
> Please see https://issues.apache.org/jira/browse/BEAM-14168 for more 
> information on additional work remaining after this task is complete.
> As a result of this task there will be:
> 1. one class deletion and four classes created for the PR reflecting whether 
> we are reading from or writing to BigQuery, via BigQueryIO.Read and 
> BigQueryIO.Write, respectively.
> * delete: 
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
> * create: 
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
> * create: 
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
> * create: 
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java
> * create: 
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
> 2. The testing strategy will leverage the use of the 
> org.apache.beam.sdk.transforms.display.DisplayData class instead of using the 
> org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices.  
> We will test whether the input of BigQuerySchemaTransformReadConfiguration 
> yields a BigQueryIO.Read with correct query, tableSpec, useLegacySQL 
> (defaulted to false), and queryLocation values.  
> We will test whether the input of BigQuerySchemaTransformWriteConfiguration 
> yields a BigQueryIO.Write with correct tableReference, createDisposition, and 
> writeDisposition values.
> References:
> 1 - 
> [sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java|https://github.com/apache/beam/blob/fc00b9697a0dfb73a03981f4d6c2c8dd1e316d5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java]
> 2 - 
> [SchemaTransform|https://github.com/apache/beam/blob/a47a725863fc8c37a9b8520cebeef83677fe531d/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java]
> 3 - 
> [SchemaTransformProvider|https://github.com/apache/beam/blob/a47a725863fc8c37a9b8520cebeef83677fe531d/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to