Repository: beam Updated Branches: refs/heads/master 0a073af40 -> 66b864f2b
Move stripping code into BigQueryHelpers and add better unit-test coverage. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fb6417f8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fb6417f8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fb6417f8 Branch: refs/heads/master Commit: fb6417f81482d22ef1cff9505a6589360e506dc0 Parents: 0f50eb7 Author: Reuven Lax <[email protected]> Authored: Tue Sep 19 20:24:18 2017 -0700 Committer: Reuven Lax <[email protected]> Committed: Thu Sep 21 20:16:22 2017 -0700 ---------------------------------------------------------------------- .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 8 ++++ .../beam/sdk/io/gcp/bigquery/CreateTables.java | 8 ++-- .../sdk/io/gcp/bigquery/TableDestination.java | 11 ----- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 47 ++++++++++++++++---- .../sdk/io/gcp/bigquery/FakeDatasetService.java | 15 ++++++- 5 files changed, 65 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fb6417f8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 7f9e27a..02a47c2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -112,6 +112,14 @@ public class BigQueryHelpers { return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE")); } + /** + * Strip off any partition decorator information from a tablespec. + */ + public static String stripPartitionDecorator(String tableSpec) { + int index = tableSpec.lastIndexOf('$'); + return (index == -1) ? tableSpec : tableSpec.substring(0, index); + } + static String jobToPrettyString(@Nullable Job job) throws IOException { return job == null ? "null" : job.toPrettyString(); } http://git-wip-us.apache.org/repos/asf/beam/blob/fb6417f8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java index aff5ff1..fedd2fe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java @@ -113,9 +113,7 @@ public class CreateTables<DestinationT> private void possibleCreateTable( BigQueryOptions options, TableDestination tableDestination, TableSchema tableSchema) throws InterruptedException, IOException { - String tableSpec = tableDestination.getStrippedTableSpec(); - TableReference tableReference = tableDestination.getTableReference(); - String tableDescription = tableDestination.getTableDescription(); + String tableSpec = BigQueryHelpers.stripPartitionDecorator(tableDestination.getTableSpec()); if (createDisposition != createDisposition.CREATE_NEVER && !createdTables.contains(tableSpec)) { synchronized (createdTables) { // Another thread may have succeeded in creating the table in the meanwhile, so @@ -123,6 +121,10 @@ public class CreateTables<DestinationT> // every thread from attempting a create and overwhelming our BigQuery quota. DatasetService datasetService = bqServices.getDatasetService(options); if (!createdTables.contains(tableSpec)) { + TableReference tableReference = tableDestination.getTableReference(); + String tableDescription = tableDestination.getTableDescription(); + tableReference.setTableId( + BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId())); if (datasetService.getTable(tableReference) == null) { Table table = new Table() .setTableReference(tableReference) http://git-wip-us.apache.org/repos/asf/beam/blob/fb6417f8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index 4a4f66b..ecc34d3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -30,7 +30,6 @@ import javax.annotation.Nullable; public class TableDestination implements Serializable { private static final long serialVersionUID = 1L; private final String tableSpec; - @Nullable private String strippedTableSpec; @Nullable private final String tableDescription; @Nullable @@ -60,24 +59,14 @@ public class TableDestination implements Serializable { public TableDestination(String tableSpec, @Nullable String tableDescription, @Nullable String jsonTimePartitioning) { this.tableSpec = tableSpec; - this.strippedTableSpec = null; this.tableDescription = tableDescription; this.jsonTimePartitioning = jsonTimePartitioning; } - public String getTableSpec() { return tableSpec; } - public String getStrippedTableSpec() { - if (strippedTableSpec == null) { - int index = tableSpec.lastIndexOf('$'); - strippedTableSpec = (index == -1) ? tableSpec : tableSpec.substring(0, index); - } - return strippedTableSpec; - } - public TableReference getTableReference() { return BigQueryHelpers.parseTableSpec(tableSpec); } http://git-wip-us.apache.org/repos/asf/beam/blob/fb6417f8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 7927282..ad4cbaa 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -670,7 +670,7 @@ public class BigQueryIOTest implements Serializable { .setFields( ImmutableList.of( new TableFieldSchema().setName("number").setType("INTEGER"))); - p.apply(Create.of(row1, row1)) + p.apply(Create.of(row1, row2)) .apply( BigQueryIO.writeTableRows() .to("project-id:dataset-id.table-id") @@ -1820,7 +1820,7 @@ public class BigQueryIOTest implements Serializable { options.setTempLocation(baseDir.toString()); - List<TableRow> read = convertBigDecimaslToLong( + List<TableRow> read = convertBigDecimalsToLong( SourceTestUtils.readFromSource(bqSource, options)); assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class))); SourceTestUtils.assertSplitAtFractionBehavior( @@ -2329,7 +2329,7 @@ public class BigQueryIOTest implements Serializable { IntervalWindow.getCoder())); } - List<TableRow> convertBigDecimaslToLong(List<TableRow> toConvert) { + List<TableRow> convertBigDecimalsToLong(List<TableRow> toConvert) { // The numbers come back as BigDecimal objects after JSON serialization. Change them back to // longs so that we can assert the output. List<TableRow> converted = Lists.newArrayList(); @@ -2345,13 +2345,42 @@ public class BigQueryIOTest implements Serializable { } @Test - public void testTableDecoratorStripping() { - TableDestination tableDestination = tableDestination = new TableDestination( - "project:dataset.table$decorator", ""); - assertEquals("project:dataset.table", tableDestination.getStrippedTableSpec()); + public void testWriteToTableDecorator() throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("project-id"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); - tableDestination = new TableDestination("project:dataset.table", ""); - assertEquals("project:dataset.table", tableDestination.getStrippedTableSpec()); + FakeDatasetService datasetService = new FakeDatasetService(); + FakeBigQueryServices fakeBqServices = + new FakeBigQueryServices() + .withJobService(new FakeJobService()) + .withDatasetService(datasetService); + datasetService.createDataset("project-id", "dataset-id", "", ""); + Pipeline p = TestPipeline.create(bqOptions); + TableRow row1 = new TableRow().set("name", "a").set("number", "1"); + TableRow row2 = new TableRow().set("name", "b").set("number", "2"); + + TableSchema schema = new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER"))); + p.apply(Create.of(row1, row2)) + .apply( + BigQueryIO.writeTableRows() + .to("project-id:dataset-id.table-id$decorator") + .withTestServices(fakeBqServices) + .withMethod(Method.STREAMING_INSERTS) + .withSchema(schema) + .withoutValidation()); + p.run(); + } + + @Test + public void testTableDecoratorStripping() { + assertEquals("project:dataset.table", + BigQueryHelpers.stripPartitionDecorator("project:dataset.table$decorator")); + assertEquals("project:dataset.table", + BigQueryHelpers.stripPartitionDecorator("project:dataset.table")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/fb6417f8/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java index bcd84f7..323f663 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java @@ -36,6 +36,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Pattern; import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy.Context; @@ -111,7 +112,17 @@ class FakeDatasetService implements DatasetService, Serializable { @Override public void createTable(Table table) throws IOException { + final Pattern tableRegexp = Pattern.compile("[-\\w]{1,1024}"); + TableReference tableReference = table.getTableReference(); + if (!tableRegexp.matcher(tableReference.getTableId()).matches()) { + throw new IOException( + String.format( + "invalid table ID %s. Table IDs must be alphanumeric " + + "(plus underscores) and must be at most 1024 characters long. Also, table" + + " decorators cannot be used.", + tableReference.getTableId())); + } synchronized (BigQueryIOTest.tables) { Map<String, TableContainer> dataset = BigQueryIOTest.tables.get(tableReference.getProjectId(), tableReference.getDatasetId()); @@ -202,7 +213,9 @@ class FakeDatasetService implements DatasetService, Serializable { long dataSize = 0; TableContainer tableContainer = getTableContainer( - ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); + ref.getProjectId(), + ref.getDatasetId(), + BigQueryHelpers.stripPartitionDecorator(ref.getTableId())); for (int i = 0; i < rowList.size(); ++i) { TableRow row = rowList.get(i).getValue(); List<TableDataInsertAllResponse.InsertErrors> allErrors = insertErrors.get(row);
