[
https://issues.apache.org/jira/browse/BEAM-14035?focusedWorklogId=753055&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-753055
]
ASF GitHub Bot logged work on BEAM-14035:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Apr/22 18:54
Start Date: 05/Apr/22 18:54
Worklog Time Spent: 10m
Work Description: damondouglas commented on code in PR #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r843158545
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformConfiguration.Read;
+import
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadProvider.PCollectionRowTupleTransform;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Identifier;
+import org.apache.beam.sdk.transforms.display.DisplayData.Item;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQuerySchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformReadProviderTest {
+ private static final String PROJECT = "fakeproject";
+ private static final String DATASET = "fakedataset";
+ private static final String TABLE_ID = "faketable";
+
+ private static final String QUERY = "select * from
`fakeproject.fakedataset.faketable`";
+ private static final String LOCATION = "kingdom-of-figaro";
+
+ private static final TableReference TABLE_REFERENCE =
+ new
TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID);
+
+ private static final String TABLE_SPEC =
BigQueryHelpers.toTableSpec(TABLE_REFERENCE);
+
+ private static final Schema SCHEMA =
+ Schema.of(Field.of("name", FieldType.STRING), Field.of("number",
FieldType.INT64));
+
+ private static final List<TableRow> RECORDS =
+ Arrays.asList(
+ new TableRow().set("name", "a").set("number", 1L),
+ new TableRow().set("name", "b").set("number", 2L),
+ new TableRow().set("name", "c").set("number", 3L));
+
+ private static final List<Row> ROWS =
+ Arrays.asList(
+ Row.withSchema(SCHEMA).withFieldValue("name",
"a").withFieldValue("number", 1L).build(),
+ Row.withSchema(SCHEMA).withFieldValue("name",
"b").withFieldValue("number", 2L).build(),
+ Row.withSchema(SCHEMA).withFieldValue("name",
"c").withFieldValue("number", 3L).build());
+
+ private static final TableSchema TABLE_SCHEMA =
BigQueryUtils.toTableSchema(SCHEMA);
+ private static final BigQueryOptions OPTIONS =
+ TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+ private final FakeDatasetService fakeDatasetService = new
FakeDatasetService();
+ private final FakeJobService fakeJobService = new FakeJobService();
+ private final Table fakeTable = new Table();
+ private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+ private final FakeBigQueryServices fakeBigQueryServices =
+ new FakeBigQueryServices()
+ .withJobService(fakeJobService)
+ .withDatasetService(fakeDatasetService);
+
+ @Before
+ public void setUp() throws IOException, InterruptedException {
+ FakeDatasetService.setUp();
+ FakeJobService.setUp();
+ BigQueryIO.clearCreatedTables();
+ fakeTable.setSchema(TABLE_SCHEMA);
+ fakeTable.setTableReference(TABLE_REFERENCE);
+ fakeTable.setNumBytes(1024L * 1024L);
+ fakeDatasetService.createDataset(PROJECT, DATASET, "", "", null);
+ fakeDatasetService.createTable(fakeTable);
+ fakeDatasetService.insertAll(fakeTable.getTableReference(), RECORDS, null);
+ temporaryFolder.create();
+ OPTIONS.setProject(PROJECT);
+ OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath());
+ }
+
+ @After
+ public void tearDown() {
+ temporaryFolder.delete();
+ }
+
+ @Rule
+ public transient TestPipeline p =
+ TestPipeline.fromOptions(OPTIONS).enableAbandonedNodeEnforcement(false);
+
+ @Test
+ public void testQuery() {
+ // TODO: refactor this test using FakeBigQueryServices.
Review Comment:
@angoenka Should I create a JIRA ticket for this?
Issue Time Tracking
-------------------
Worklog Id: (was: 753055)
Time Spent: 4h 50m (was: 4h 40m)
> Convert BigQuery SchemaIO to SchemaTransform
> --------------------------------------------
>
> Key: BEAM-14035
> URL: https://issues.apache.org/jira/browse/BEAM-14035
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-java-core
> Reporter: Lara Schmidt
> Assignee: Damon Douglas
> Priority: P2
> Time Spent: 4h 50m
> Remaining Estimate: 0h
>
> The output of this task is to refactor
> org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaIOProvider [1] to implement
> SchemaTransform [2] and SchemaTransformProvider interfaces [3].
> Please see https://issues.apache.org/jira/browse/BEAM-14168 for more
> information on additional work remaining after this task is complete.
> As a result of this task there will be:
> 1. one class deletion and four classes created for the PR reflecting whether
> we are reading from or writing to BigQuery, via BigQueryIO.Read and
> BigQueryIO.Write, respectively.
> * delete:
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java
> * create:
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
> * create:
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
> * create:
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java
> * create:
> sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
> 2. The testing strategy will leverage the use of the
> org.apache.beam.sdk.transforms.display.DisplayData class instead of using the
> org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices.
> We will test whether the input of BigQuerySchemaTransformReadConfiguration
> yields a BigQueryIO.Read with correct query, tableSpec, useLegacySQL
> (defaulted to false), and queryLocation values.
> We will test whether the input of BigQuerySchemaTransformWriteConfiguration
> yields a BigQueryIO.Write with correct tableReference, createDisposition, and
> writeDisposition values.
> References:
> 1 -
> [sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java|https://github.com/apache/beam/blob/fc00b9697a0dfb73a03981f4d6c2c8dd1e316d5e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaIOProvider.java]
> 2 -
> [SchemaTransform|https://github.com/apache/beam/blob/a47a725863fc8c37a9b8520cebeef83677fe531d/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java]
> 3 -
> [SchemaTransformProvider|https://github.com/apache/beam/blob/a47a725863fc8c37a9b8520cebeef83677fe531d/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.java]
--
This message was sent by Atlassian Jira
(v8.20.1#820001)