Repository: beam Updated Branches: refs/heads/master 1c26b7488 -> 6280d497b
Add support for TimePartitioning in BigQueryIO.write(). Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b0e03a33 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b0e03a33 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b0e03a33 Branch: refs/heads/master Commit: b0e03a33cf0c2c573a2d34d88506e19ebb28c934 Parents: 1c26b74 Author: Reuven Lax <[email protected]> Authored: Sun Jul 30 21:42:59 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Aug 29 14:05:15 2017 -0700 ---------------------------------------------------------------------- pom.xml | 2 +- ...aultCoderCloudObjectTranslatorRegistrar.java | 2 + .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 4 +- .../sdk/io/gcp/bigquery/BigQueryHelpers.java | 8 +++ .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 47 ++++++++++++++++ .../beam/sdk/io/gcp/bigquery/CreateTables.java | 15 +++-- .../bigquery/DynamicDestinationsHelpers.java | 27 ++++++++- .../sdk/io/gcp/bigquery/TableDestination.java | 39 ++++++++++++- .../io/gcp/bigquery/TableDestinationCoder.java | 2 + .../gcp/bigquery/TableDestinationCoderV2.java | 59 ++++++++++++++++++++ .../beam/sdk/io/gcp/bigquery/WriteTables.java | 7 ++- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 50 +++++++++++++++++ .../sdk/io/gcp/bigquery/FakeJobService.java | 32 +++++++++-- .../sdk/io/gcp/bigquery/TableContainer.java | 2 + 14 files changed, 278 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 81c8003..b563f8c 100644 --- a/pom.xml +++ b/pom.xml @@ -107,7 +107,7 @@ <apex.kryo.version>2.24.0</apex.kryo.version> <api-common.version>1.0.0-rc2</api-common.version> <avro.version>1.8.2</avro.version> - <bigquery.version>v2-rev295-1.22.0</bigquery.version> + <bigquery.version>v2-rev355-1.22.0</bigquery.version> <bigtable.version>0.9.7.1</bigtable.version> <cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version> <pubsubgrpc.version>0.1.0</pubsubgrpc.version> http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java index 5d42a5f..ff89933 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.coders.TextualIntegerCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder; +import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoderV2; import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; /** @@ -97,6 +98,7 @@ public class DefaultCoderCloudObjectTranslatorRegistrar RandomAccessDataCoder.class, StringUtf8Coder.class, TableDestinationCoder.class, + TableDestinationCoderV2.class, TableRowJsonCoder.class, TextualIntegerCoder.class, VarIntCoder.class, http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 0a1306d..76cf7e8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -266,7 +266,7 @@ class BatchLoads<DestinationT> .apply(WithKeys.<Void, KV<TableDestination, String>>of((Void) null)) .setCoder( KvCoder.of( - VoidCoder.of(), KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of()))) + VoidCoder.of(), KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of()))) .apply(GroupByKey.<Void, KV<TableDestination, String>>create()) .apply(Values.<Iterable<KV<TableDestination, String>>>create()) .apply( @@ -323,7 +323,7 @@ class BatchLoads<DestinationT> tempTables .apply("ReifyRenameInput", new ReifyAsIterable<KV<TableDestination, String>>()) - .setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of()))) + .setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of()))) .apply( "WriteRenameUntriggered", ParDo.of( http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 78dcdde..7f9e27a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.Hashing; @@ -291,6 +292,13 @@ public class BigQueryHelpers { } } + static class TimePartitioningToJson implements SerializableFunction<TimePartitioning, String> { + @Override + public String apply(TimePartitioning partitioning) { + return toJsonString(partitioning); + } + } + static String createJobIdToken(String jobName, String stepUuid) { return String.format("beam_job_%s_%s", stepUuid, jobName.replaceAll("-", "")); } http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index feb085d..29828e4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -31,6 +31,7 @@ import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicates; @@ -60,9 +61,11 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRe import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TimePartitioningToJson; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantSchemaDestinations; +import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioningDestinations; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.SchemaFromViewDestinations; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.TableFunctionDestinations; import org.apache.beam.sdk.options.PipelineOptions; @@ -824,6 +827,7 @@ public class BigQueryIO { @Nullable abstract DynamicDestinations<T, ?> getDynamicDestinations(); @Nullable abstract PCollectionView<Map<String, String>> getSchemaFromView(); @Nullable abstract ValueProvider<String> getJsonSchema(); + @Nullable abstract ValueProvider<String> getJsonTimePartitioning(); abstract CreateDisposition getCreateDisposition(); abstract WriteDisposition getWriteDisposition(); /** Table description. Default is empty. */ @@ -854,6 +858,7 @@ public class BigQueryIO { abstract Builder<T> setDynamicDestinations(DynamicDestinations<T, ?> dynamicDestinations); abstract Builder<T> setSchemaFromView(PCollectionView<Map<String, String>> view); abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema); + abstract Builder<T> setJsonTimePartitioning(ValueProvider<String> jsonTimePartitioning); abstract Builder<T> setCreateDisposition(CreateDisposition createDisposition); abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition); abstract Builder<T> setTableDescription(String tableDescription); @@ -1022,6 +1027,33 @@ public class BigQueryIO { return toBuilder().setSchemaFromView(view).build(); } + /** + * Allows newly created tables to include a {@link TimePartitioning} class. Can only be used + * when writing to a single table. If {@link #to(SerializableFunction)} or + * {@link #to(DynamicDestinations)} is used to write dynamic tables, time partitioning can be + * directly in the returned {@link TableDestination}. + */ + public Write<T> withTimePartitioning(TimePartitioning partitioning) { + return withJsonTimePartitioning( + StaticValueProvider.of(BigQueryHelpers.toJsonString(partitioning))); + } + + /** + * Like {@link #withTimePartitioning(TimePartitioning)} but using a deferred + * {@link ValueProvider}. + */ + public Write<T> withTimePartitioning(ValueProvider<TimePartitioning> partition) { + return withJsonTimePartitioning(NestedValueProvider.of( + partition, new TimePartitioningToJson())); + } + + /** + * The same as {@link #withTimePartitioning}, but takes a JSON-serialized object. + */ + public Write<T> withJsonTimePartitioning(ValueProvider<String> partition) { + return toBuilder().setJsonTimePartitioning(partition).build(); + } + /** Specifies whether the table should be created if it does not exist. */ public Write<T> withCreateDisposition(CreateDisposition createDisposition) { return toBuilder().setCreateDisposition(createDisposition).build(); @@ -1183,6 +1215,15 @@ public class BigQueryIO { input.isBounded(), method); } + if (getJsonTimePartitioning() != null) { + checkArgument(getDynamicDestinations() == null, + "The supplied DynamicDestinations object can directly set TimePartitioning." + + " There is no need to call BigQueryIO.Write.withTimePartitioning."); + checkArgument(getTableFunction() == null, + "The supplied getTableFunction object can directly set TimePartitioning." + + " There is no need to call BigQueryIO.Write.withTimePartitioning."); + } + DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations(); if (dynamicDestinations == null) { if (getJsonTableRef() != null) { @@ -1205,6 +1246,12 @@ public class BigQueryIO { (DynamicDestinations<T, TableDestination>) dynamicDestinations, getSchemaFromView()); } + + // Wrap with a DynamicDestinations class that will provide the proper TimePartitioning. + if (getJsonTimePartitioning() != null) { + dynamicDestinations = new ConstantTimePartitioningDestinations( + dynamicDestinations, getJsonTimePartitioning()); + } } return expandTyped(input, dynamicDestinations); } http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java index 3dc10b0..7f83b83 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java @@ -73,7 +73,7 @@ public class CreateTables<DestinationT> } CreateTables<DestinationT> withTestServices(BigQueryServices bqServices) { - return new CreateTables<DestinationT>(createDisposition, bqServices, dynamicDestinations); + return new CreateTables<>(createDisposition, bqServices, dynamicDestinations); } @Override @@ -124,11 +124,14 @@ public class CreateTables<DestinationT> DatasetService datasetService = bqServices.getDatasetService(options); if (!createdTables.contains(tableSpec)) { if (datasetService.getTable(tableReference) == null) { - datasetService.createTable( - new Table() - .setTableReference(tableReference) - .setSchema(tableSchema) - .setDescription(tableDescription)); + Table table = new Table() + .setTableReference(tableReference) + .setSchema(tableSchema) + .setDescription(tableDescription); + if (tableDestination.getTimePartitioning() != null) { + table.setTimePartitioning(tableDestination.getTimePartitioning()); + } + datasetService.createTable(table); } createdTables.add(tableSpec); } http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java index 530e2b6..818ea34 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -108,7 +108,7 @@ class DynamicDestinationsHelpers { @Override public Coder<TableDestination> getDestinationCoder() { - return TableDestinationCoder.of(); + return TableDestinationCoderV2.of(); } } @@ -164,6 +164,31 @@ class DynamicDestinationsHelpers { } } + static class ConstantTimePartitioningDestinations<T> + extends DelegatingDynamicDestinations<T, TableDestination> { + + @Nullable + private final ValueProvider<String> jsonTimePartitioning; + + ConstantTimePartitioningDestinations(DynamicDestinations<T, TableDestination> inner, + ValueProvider<String> jsonTimePartitioning) { + super(inner); + this.jsonTimePartitioning = jsonTimePartitioning; + } + + @Override + public TableDestination getDestination(ValueInSingleWindow<T> element) { + TableDestination destination = super.getDestination(element); + return new TableDestination(destination.getTableSpec(), destination.getTableDescription(), + jsonTimePartitioning.get()); + } + + @Override + public Coder<TableDestination> getDestinationCoder() { + return TableDestinationCoderV2.of(); + } + } + /** * Takes in a side input mapping tablespec to json table schema, and always returns the * matching schema from the side input. http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java index ecf35d8..79f1b22 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TimePartitioning; import java.io.Serializable; import java.util.Objects; import javax.annotation.Nullable; @@ -31,18 +32,38 @@ public class TableDestination implements Serializable { private final String tableSpec; @Nullable private final String tableDescription; + @Nullable + private final String jsonTimePartitioning; public TableDestination(String tableSpec, @Nullable String tableDescription) { - this.tableSpec = tableSpec; - this.tableDescription = tableDescription; + this(tableSpec, tableDescription, (String) null); } public TableDestination(TableReference tableReference, @Nullable String tableDescription) { - this.tableSpec = BigQueryHelpers.toTableSpec(tableReference); + this(tableReference, tableDescription, null); + } + + public TableDestination(TableReference tableReference, @Nullable String tableDescription, + TimePartitioning timePartitioning) { + this(BigQueryHelpers.toTableSpec(tableReference), tableDescription, + timePartitioning != null ? BigQueryHelpers.toJsonString(timePartitioning) : null); + } + + public TableDestination(String tableSpec, @Nullable String tableDescription, + TimePartitioning timePartitioning) { + this(tableSpec, tableDescription, + timePartitioning != null ? BigQueryHelpers.toJsonString(timePartitioning) : null); + } + + public TableDestination(String tableSpec, @Nullable String tableDescription, + @Nullable String jsonTimePartitioning) { + this.tableSpec = tableSpec; this.tableDescription = tableDescription; + this.jsonTimePartitioning = jsonTimePartitioning; } + public String getTableSpec() { return tableSpec; } @@ -51,6 +72,18 @@ public class TableDestination implements Serializable { return BigQueryHelpers.parseTableSpec(tableSpec); } + public String getJsonTimePartitioning() { + return jsonTimePartitioning; + } + + public TimePartitioning getTimePartitioning() { + if (jsonTimePartitioning == null) { + return null; + } else { + return BigQueryHelpers.fromJsonString(jsonTimePartitioning, TimePartitioning.class); + } + } + @Nullable public String getTableDescription() { return tableDescription; http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java index f034a03..2bfc2ca 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java @@ -33,6 +33,8 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> { private static final Coder<String> tableSpecCoder = StringUtf8Coder.of(); private static final Coder<String> tableDescriptionCoder = NullableCoder.of(StringUtf8Coder.of()); + private TableDestinationCoder() {} + public static TableDestinationCoder of() { return INSTANCE; } http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java new file mode 100644 index 0000000..5bdab0d --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoderV2.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.gcp.bigquery; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; + +/** + * A {@link Coder} for {@link TableDestination} that includes time partitioning information. This + * is a new coder (instead of extending the old {@link TableDestinationCoder}) for compatibility + * reasons. The old coder is kept around for the same compatibility reasons. + */ +public class TableDestinationCoderV2 extends AtomicCoder<TableDestination> { + private static final TableDestinationCoderV2 INSTANCE = new TableDestinationCoderV2(); + private static final Coder<String> timePartitioningCoder = NullableCoder.of(StringUtf8Coder.of()); + + public static TableDestinationCoderV2 of() { + return INSTANCE; + } + + @Override + public void encode(TableDestination value, OutputStream outStream) throws IOException { + TableDestinationCoder.of().encode(value, outStream); + timePartitioningCoder.encode(value.getJsonTimePartitioning(), outStream); + } + + @Override + public TableDestination decode(InputStream inStream) throws IOException { + TableDestination destination = TableDestinationCoder.of().decode(inStream); + String jsonTimePartitioning = timePartitioningCoder.decode(inStream); + return new TableDestination( + destination.getTableSpec(), destination.getTableDescription(), jsonTimePartitioning); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException {} +} http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index c8fab75..a646f17 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -23,6 +23,7 @@ import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -135,6 +136,7 @@ class WriteTables<DestinationT> bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdPrefix, tableReference, + tableDestination.getTimePartitioning(), tableSchema, partitionFiles, writeDisposition, @@ -150,6 +152,7 @@ class WriteTables<DestinationT> DatasetService datasetService, String jobIdPrefix, TableReference ref, + TimePartitioning timePartitioning, @Nullable TableSchema schema, List<String> gcsUris, WriteDisposition writeDisposition, @@ -164,7 +167,9 @@ class WriteTables<DestinationT> .setWriteDisposition(writeDisposition.name()) .setCreateDisposition(createDisposition.name()) .setSourceFormat("NEWLINE_DELIMITED_JSON"); - + if (timePartitioning != null) { + loadConfig.setTimePartitioning(timePartitioning); + } String projectId = ref.getProjectId(); Job lastFailedLoadJob = null; for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) { http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 0ece3ee..18547cd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -47,6 +47,7 @@ import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; @@ -638,6 +639,55 @@ public class BigQueryIOTest implements Serializable { } @Test + public void testTimePartitioningStreamingInserts() throws Exception { + testTimePartitioning(Method.STREAMING_INSERTS); + } + + @Test + public void testTimePartitioningBatchLoads() throws Exception { + testTimePartitioning(Method.FILE_LOADS); + } + + public void testTimePartitioning(BigQueryIO.Write.Method insertMethod) throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("project-id"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + + FakeDatasetService datasetService = new FakeDatasetService(); + FakeBigQueryServices fakeBqServices = + new FakeBigQueryServices() + .withJobService(new FakeJobService()) + .withDatasetService(datasetService); + datasetService.createDataset("project-id", "dataset-id", "", ""); + + Pipeline p = TestPipeline.create(bqOptions); + TableRow row1 = new TableRow().set("name", "a").set("number", "1"); + TableRow row2 = new TableRow().set("name", "b").set("number", "2"); + + TimePartitioning timePartitioning = new TimePartitioning() + .setType("DAY") + .setExpirationMs(1000L); + TableSchema schema = new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER"))); + p.apply(Create.of(row1, row1)) + .apply( + BigQueryIO.writeTableRows() + .to("project-id:dataset-id.table-id") + .withTestServices(fakeBqServices) + .withMethod(insertMethod) + .withSchema(schema) + .withTimePartitioning(timePartitioning) + .withoutValidation()); + p.run(); + Table table = datasetService.getTable( + BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id")); + assertEquals(schema, table.getSchema()); + assertEquals(timePartitioning, table.getTimePartitioning()); + } + + @Test @Category({ValidatesRunner.class, UsesTestStream.class}) public void testTriggeredFileLoads() throws Exception { BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index 7d5101d..cc600d1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.client.json.JsonFactory; import com.google.api.client.util.BackOff; @@ -39,6 +40,7 @@ import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.bigquery.model.TimePartitioning; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -310,8 +312,13 @@ class FakeJobService implements JobService, Serializable { if (!validateDispositions(existingTable, createDisposition, writeDisposition)) { return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto()); } - - datasetService.createTable(new Table().setTableReference(destination).setSchema(schema)); + if (existingTable == null) { + existingTable = new Table().setTableReference(destination).setSchema(schema); + if (load.getTimePartitioning() != null) { + existingTable = existingTable.setTimePartitioning(load.getTimePartitioning()); + } + datasetService.createTable(existingTable); + } List<TableRow> rows = Lists.newArrayList(); for (ResourceId filename : sourceFiles) { @@ -331,13 +338,30 @@ class FakeJobService implements JobService, Serializable { if (!validateDispositions(existingTable, createDisposition, writeDisposition)) { return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto()); } - + TimePartitioning partitioning = null; + TableSchema schema = null; + boolean first = true; List<TableRow> allRows = Lists.newArrayList(); for (TableReference source : sources) { + Table table = checkNotNull(datasetService.getTable(source)); + if (!first) { + if (partitioning != table.getTimePartitioning()) { + return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto()); + } + if (schema != table.getSchema()) { + return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto()); + } + } + partitioning = table.getTimePartitioning(); + schema = table.getSchema(); + first = false; allRows.addAll(datasetService.getAllRows( source.getProjectId(), source.getDatasetId(), source.getTableId())); } - datasetService.createTable(new Table().setTableReference(destination)); + datasetService.createTable(new Table() + .setTableReference(destination) + .setSchema(schema) + .setTimePartitioning(partitioning)); datasetService.insertAll(destination, allRows, null); return new JobStatus().setState("DONE"); } http://git-wip-us.apache.org/repos/asf/beam/blob/b0e03a33/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java index 8915069..e016c98 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableContainer.java @@ -32,6 +32,7 @@ class TableContainer { Long sizeBytes; TableContainer(Table table) { this.table = table; + this.rows = new ArrayList<>(); this.ids = new ArrayList<>(); this.sizeBytes = 0L; @@ -54,6 +55,7 @@ class TableContainer { return table; } + List<TableRow> getRows() { return rows; }
